You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by 163 <he...@163.com> on 2017/07/20 08:59:52 UTC

How to tune the performance of TpchQuery5 Multi Dataframe join within Spark

Thank you, I’ve tried to read parquet file parallely, and the execution time reduce about 15s, from 1.5m to 1.3m . Thanks a lot. @Pralabh Kumar
And I follow the link of Spark tuning points, when I set “spark.sql.shuffle.partitions” to 320/480. running time is reduced to 1.1 minutes 

My configuration on conf/spark-default.conf:
spark.local.dir /dev/shm/hewenting/tmp
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=400
spark.sql.codegen.wholeStage false
spark.sql.shuffle.partitions 480
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.default.parallelism 100
spark.scheduler.mode FAIR

Below is the main running job. How can I speedup this query? 




My code:
  def main(args: Array[String]): Unit = {
    case class readData (fileName : String , spark : SparkSession) extends Callable[Dataset[Row]]{
      override def call(): Dataset[Row] = {
        spark.read.parquet(fileName)
      }
    }

    val spark =  SparkSession.builder()
      .appName("practice")
      .config("spark.master", "spark://10.61.2.127:7077")
      .config("spark.scheduler.mode","FAIR")
       .getOrCreate()
    import spark.implicits._
    val pool: ExecutorService = Executors.newFixedThreadPool(6)
    val list = new ArrayList[Future[Dataset[Row]]]()

    for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){
      val o1 = new readData("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/" + fileName,spark)
      val tmp: Future[Dataset[Row]]= pool.submit(o1)
      list.add(tmp)
    }
    val rddList = new ArrayBuffer[Dataset[Row]]()
    for (i <- 0 to 5) {
      rddList += list.get(i).get()
    }
    pool.shutdown()
    pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)

    val forders = rddList(0).filter("o_orderdate < '1995-01-01' and o_orderdate >= '1994-01-01'").select("o_custkey", "o_orderkey")
    val flineitem = rddList(1).select("l_orderkey", "l_suppkey", "l_discount", "l_extendedprice")
    val fregion = rddList(4).where("r_name = 'ASIA'").select("r_regionkey")
    val fnation = rddList(5).select("n_nationkey", "n_regionkey", "n_name")
    val fsupplier = rddList(3).select("s_nationkey", "s_suppkey")
    val fcustomer = rddList(2).select("c_custkey", "c_nationkey")
      val res = flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
        .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
        .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
        .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
        .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
        .select($"n_name", ($"l_extendedprice" * (lit(1) - $"l_discount")).as("value"))
        .groupBy($"n_name")
        .agg(sum($"value").as("revenue"))
        .sort($"revenue".desc)
      println(res.collect())
}




> 在 2017年7月17日,下午9:50,Pralabh Kumar <pr...@gmail.com> 写道:
> 
> Hi
> 
> To read file parallely , you can follow the below code.
> 
> 
>  case class readData (fileName : String , spark : SparkSession) extends Callable[Dataset[Row]]{
>   override def call(): Dataset[Row] = {
>     spark.read.parquet(fileName)
>    // spark.read.csv(fileName)
>   }
> }
> 
> val spark =  SparkSession.builder()
>      .appName("practice")
>      .config("spark.scheduler.mode","FAIR")
>      .enableHiveSupport().getOrCreate()
>    val pool = Executors.newFixedThreadPool(6)
>    val list = new util.ArrayList[Future[Dataset[Row]]]()
>   
>    for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){
>      val o1 = new readData(fileName,spark)
>      //pool.submit(o1).
>      list.add(pool.submit(o1))
>    }
>    val rddList = new ArrayBuffer[Dataset[Row]]()
>    for(result <- list){
>      rddList += result.get()
>    }
> 
>    pool.shutdown()
>    pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
>    for(finalData<-rddList){
>      finalData.show()
>    }
> 
> 
> This will read data in parallel ,which I think is your main bottleneck.
> 
> Regards
> Pralabh Kumar
> 
> 
> 
> On Mon, Jul 17, 2017 at 6:25 PM, vaquar khan <vaquar.khan@gmail.com <ma...@gmail.com>> wrote:
> Could you please let us know your Spark version?
> 
> 
> Regards, 
> vaquar khan 
> 
> 
> On Jul 17, 2017 12:18 AM, "163" <hewenting_ict@163.com <ma...@163.com>> wrote:
> I change the UDF but the performance seems still slow. What can I do else?
> 
> 
>> 在 2017年7月14日,下午8:34,Wenchen Fan <cloud0fan@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.
>> 
>>> On 14 Jul 2017, at 5:46 PM, 163 <hewenting_ict@163.com <ma...@163.com>> wrote:
>>> 
>>> I modify the tech query5 to DataFrame:
>>> val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders <>”).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
>>> val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem <>")
>>> val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer <>")
>>> val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier <>")
>>> val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region <>”).where("r_name = 'ASIA'").select($"r_regionkey")
>>> val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation <>”)
>>> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>>> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>>>      .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>>>      .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
>>>      .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>>>      .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>>>      .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
>>>      .groupBy($"n_name")
>>>      .agg(sum($"value").as("revenue"))
>>>      .sort($"revenue".desc).show()
>>> 
>>> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
>>> It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
>>>  I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
>>> And is there any advices for me to tuning this performance?
>>> Thank you.
>>> 
>>> Wenting He
>>> 
>> 
> 
>