You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "kanika dhuria (JIRA)" <ji...@apache.org> on 2019/06/28 17:22:00 UTC
[jira] [Reopened] (SPARK-22207) High memory usage when converting
relational data to Hierarchical data
[ https://issues.apache.org/jira/browse/SPARK-22207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kanika dhuria reopened SPARK-22207:
-----------------------------------
Same issue is seen in spark 2.4
> High memory usage when converting relational data to Hierarchical data
> ----------------------------------------------------------------------
>
> Key: SPARK-22207
> URL: https://issues.apache.org/jira/browse/SPARK-22207
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.0
> Reporter: kanika dhuria
> Priority: Major
> Labels: bulk-closed
>
> Have 4 tables
> lineitems ~1.4Gb,
> orders ~ 330MB
> customer ~47MB
> nations ~ 2.2K
> These tables are related as follows
> There are multiple lineitems per order (pk, fk:orderkey)
> There are multiple orders per customer(pk,fk: cust_key)
> There are multiple customers per nation(pk, fk:nation key)
> Data is almost evenly distributed.
> Building hierarchy till 3 levels i.e joining lineitems, orders, customers works good with executor memory 4Gb/2cores
> Adding nations require 8GB/2 cores or 4GB/1 core memory.
> ==============================================================
> {noformat}
> val sqlContext = SparkSession.builder() .enableHiveSupport() .config("spark.sql.retainGroupColumns", false) .config("spark.sql.crossJoin.enabled", true) .getOrCreate()
>
> val orders = sqlContext.sql("select * from orders")
> val lineItem = sqlContext.sql("select * from lineitems")
>
> val customer = sqlContext.sql("select * from customers")
>
> val nation = sqlContext.sql("select * from nations")
>
> val lineitemOrders = lineItem.groupBy(col("l_orderkey")).agg(col("l_orderkey"), collect_list(struct(col("l_partkey"), col("l_suppkey"),col("l_linenumber"),col("l_quantity"),col("l_extendedprice"),col("l_discount"),col("l_tax"),col("l_returnflag"),col("l_linestatus"),col("l_shipdate"),col("l_commitdate"),col("l_receiptdate"),col("l_shipinstruct"),col("l_shipmode"))).as("lineitem")).join(orders, orders("O_ORDERKEY")=== lineItem("l_orderkey")).select(col("O_ORDERKEY"), col("O_CUSTKEY"), col("O_ORDERSTATUS"), col("O_TOTALPRICE"), col("O_ORDERDATE"), col("O_ORDERPRIORITY"), col("O_CLERK"), col("O_SHIPPRIORITY"), col("O_COMMENT"), col("lineitem"))
>
> val customerList = lineitemOrders.groupBy(col("o_custkey")).agg(col("o_custkey"),collect_list(struct(col("O_ORDERKEY"), col("O_CUSTKEY"), col("O_ORDERSTATUS"), col("O_TOTALPRICE"), col("O_ORDERDATE"), col("O_ORDERPRIORITY"), col("O_CLERK"), col("O_SHIPPRIORITY"), col("O_COMMENT"),col("lineitem"))).as("items")).join(customer,customer("c_custkey")=== lineitemOrders("o_custkey")).select(col("c_custkey"),col("c_name"),col("c_nationkey"),col("items"))
> val nationList = customerList.groupBy(col("c_nationkey")).agg(col("c_nationkey"),collect_list(struct(col("c_custkey"),col("c_name"),col("c_nationkey"),col("items"))).as("custList")).join(nation,nation("n_nationkey")===customerList("c_nationkey")).select(col("n_nationkey"),col("n_name"),col("custList"))
>
> nationList.write.mode("overwrite").json("filePath")
> {noformat}
> ========================================================
> If the customeList is saved in a file and then the last agg/join is run separately, it does run fine in 4GB/2 core .
> I can provide the data if needed.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org