You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Chenghao <ch...@cs.umass.edu> on 2019/03/23 06:51:59 UTC

[spark context / spark sql] unexpected disk IO activity after spark job finished but spark context has not

Hi, 

I have a SparkSQL workload and ran it as a batch job in two cases. In the
first case, I execute the workload, and stop the batch job after `.show()`
finished. In the second case, I executed the same workload, and called a
1-minute sleep `Thread.sleep(60000)` before I stop its spark context and the
batch job. The time costs for the workload in two cases are similar but I
detected an unexpected DISKBUSY spike on the spark local file in the second
case by using a system monitor tool "nmon" as shown in my second figure.

Could anyone help explain the reason of the disk spike and how to avoid it?
Does spark context have some periodical async IO activities that lead to the
spikes?

System Background: 
1. Spark 2.3.1, Hadoop 2.9.1, Hive 2.3.4 for metadata storage. 
2. One master and two worker nodes. Each node has enough available resources
(32 cores, 750G memory and 8 8-T disks from disk1 to disk8).
3. The HDFS is deployed on disk8; the disk1 is used for the spark shuffle
writing local storage.
4. I use Yarn client mode for resource management.
5. There is no other big application running in the backend.

Current Analysis:
1. The spike is not caused by the disk itself and other background
processes. I tried disk2, disk3, disk4, and disk8 for yarn local storage for
testing whether the spike is related to the program and the answer is yes.
It shows the same spikes every time I executed case 2.
2. The spike is caused by Spark itself. I tried the standalone deploy mode
and the spike still exists.
3. It might be relevant to shuffle size. The total shuffle writing size of
the target batch job is close to 2GB. Different workloads with shuffle
writing size close to 1MB, 250MB, and 1GB are also tried. The DISKBUSY
becomes negligible for the batch job with shuffling write size 1MB and
becomes up to 80% for the batch job with the total shuffling write size
250MB.
4. The disk spike might be for disk swap. The size of the local storage file
is traced. When disk spike appears, disk writing is detected but the disk
size does not increase -- So it might be doing some disk swap? 

Case 1:
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t10001/first-case.png> 
Case 2:
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t10001/second-case.png> 

To be more clear for the figures, the /worker1 node local/ and /worker2 node
local/ stand for the disk1 in worker1 and worker2 resp.; the /worker1 node
dfs/ and /worker2 node dfs/ stand for the disk8 in worker1 and worker2
resp., where HDFS locates. The left y-axis is the diskbusy (from 0% to 100%)
detected by nmon and the right y-axis is the size of the directory for hdfs
in disk8 (which we can just ignore for this problem).

Here is the code for the workload.

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
    val db = s"bigbench_sf_100"

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    spark.sql(s"use $db")

    val t1 = System.currentTimeMillis()
    spark.sql(
      s"""
         |SELECT w_state, i_item_id,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') <
unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_before,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >=
unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_after
         |FROM (
         |  SELECT *
         |  FROM web_sales ws
         |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number =
wr.wr_order_number
         |    AND ws.ws_item_sk = wr.wr_item_sk)
         |) a1
         |JOIN item i ON a1.ws_item_sk = i.i_item_sk
         |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
         |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >=
unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days
in seconds
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <=
unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in
seconds
         |GROUP BY w_state,i_item_id
         |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is
hives cluster scale counter part
         |ORDER BY w_state,i_item_id
         |LIMIT 100
       """.stripMargin).show
    val t2 = System.currentTimeMillis()

//    For case 2
//    Thread.sleep(60 * 1000)

    spark.stop()
  }
}



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org