You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vinitha Reddy Gankidi (JIRA)" <ji...@apache.org> on 2019/06/27 18:20:00 UTC

[jira] [Created] (SPARK-28188) Materialize Dataframe API

Vinitha Reddy Gankidi created SPARK-28188:
---------------------------------------------

             Summary: Materialize Dataframe API 
                 Key: SPARK-28188
                 URL: https://issues.apache.org/jira/browse/SPARK-28188
             Project: Spark
          Issue Type: New Feature
          Components: Spark Core
    Affects Versions: 2.4.3
            Reporter: Vinitha Reddy Gankidi


We have added a new API to materialize dataframes and our internal users have found it very useful. For use cases where you need to do different computations on the same dataframe, Spark recomputes the dataframe each time. This is problematic if evaluation of the dataframe is expensive.

Materialize is a Spark action. It is a way to let Spark explicitly know that the dataframe has already been computed. Once a dataframe is materialized, Spark skips all stages prior to the materialize when the dataframe is reused later on.

Spark may scan the same table twice if two queries load different columns. For example, the following two queries would scan the same data twice:
{code:java}
val tab = spark.table("some_table").filter("c LIKE '%match%'")

val num_groups = tab.agg(distinctCount($"a"))

val groups_with_b = tab.groupBy($"a").agg(min($"b") as "min"){code}
 

The same table is scanned twice because Spark doesn't know it should load b when the first query runs. You can use materialize to load and then reuse the data:

{code:java}
val materialized = spark.table("some_table").filter("c LIKE '%match%'")

                        .select($"a", $"b").repartition($"a").materialize()

val num_groups = materialized.agg(distinctCount($"a"))

val groups_with_b = materialized.groupBy($"a").agg(min($"b") as "min"){code}
 

This uses select to filter out columns that don't need to be loaded. Without this, Spark doesn't know that only a and b are going to be used later.

This example also uses repartition to add a shuffle because Spark resumes from the last shuffle. In most cases you may need to repartition the dataframe before materializing it in order to skip the expensive stages as repartition introduces a new stage. 
h3. Materialize vs Cache:
 * Caching/Persisting of dataframes is lazy. The first time the dataset is computed in an action, it will be kept in memory on the nodes. Materialize is an action that runs a job that produces the rows of data that a data frame represents, and returns a new data frame with the result. When the result data frame is used, Spark resumes execution using the data from the last shuffle.
 * By reusing shuffle data, materialized data is served by the cluster's persistent shuffle servers instead of Spark executors. This makes materialize more reliable. Caching on the other hand happens in the executor where the task runs and data could be lost if executors time out from inactivity or run out of memory.
 * Since materialize is more reliable and uses fewer resources than cache, it is usually a better choice for batch workloads. But, for processing that iterates over a dataset many times, it is better to keep the data in memory using cache or persist.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org