You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:16:56 UTC

[jira] [Resolved] (SPARK-22599) Avoid extra reading for cached table

     [ https://issues.apache.org/jira/browse/SPARK-22599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22599.
----------------------------------
    Resolution: Incomplete

> Avoid extra reading for cached table
> ------------------------------------
>
>                 Key: SPARK-22599
>                 URL: https://issues.apache.org/jira/browse/SPARK-22599
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Nan Zhu
>            Priority: Major
>              Labels: bulk-closed
>
> In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios.
> The following is an example of such a use case:
> store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1.
> With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely.
> The reason behind the extra reading operation is that we implement CachedBatch as
> {code}
> case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
> {code}
> where "stats" is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks.
> We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read,
> More details can be found in design doc:https://docs.google.com/document/d/1DSiP3ej7Wd2cWUPVrgqAtvxbSlu5_1ZZB6m_2t8_95Q/edit?usp=sharing
> performance test results:
> Environment: 6 Executors, each of which has 16 cores 90G memory
> dataset: 1T TPCDS data
> queries: tested 4 queries (Q19, Q46, Q34, Q27) in https://github.com/databricks/spark-sql-perf/blob/c2224f37e50628c5c8691be69414ec7f5a3d919a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala
> results: https://docs.google.com/spreadsheets/d/1A20LxqZzAxMjW7ptAJZF4hMBaHxKGk3TBEQoAJXfzCI/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org