You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "David Borsos (JIRA)" <ji...@apache.org> on 2018/10/11 10:23:00 UTC

[jira] [Created] (SPARK-25707) Repeated caching doesn't work when using an UDF on a nullable field

David Borsos created SPARK-25707:
------------------------------------

             Summary: Repeated caching doesn't work when using an UDF on a nullable field
                 Key: SPARK-25707
                 URL: https://issues.apache.org/jira/browse/SPARK-25707
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, Spark Core
    Affects Versions: 2.3.1
         Environment: Reproducible on local Spark executors
            Reporter: David Borsos
         Attachments: UdfCachingNullableBug.scala

It seems like Spark doesn't manage find already cached data and triggers a re-read from the source in case when all the following circumstances are present:
 # Caching a dataset that is derived from another already cached dataset
 # Having nullable fields in the Dataset of types that are mapped to primitives (Integer, Boolean, etc...)
 # Using a UDF that takes non-nullable primitive parameters (Int, Boolean) on one of these nullable fields

In this case Spark's optimizer will create a null-check on these fields prior to passing them into the UDF's code (presumably to prevent a NullPointerException). The plans with the null-checks are generated when a Dataset is persisted and when the optimizer is looking for fragments of a query plan that can be read from cache instead of resolving the full Dataset lineage.

However; the plan fragments between the query that is currently being persisted and what is in the cache do not match - the lookup version will have the null-check included into the query plan twice; thus preventing the optimizer from generating a plan that relies on the cached data and ultimately triggering a repeated read from the original datasource.

The reproduction sequence in pseudocode:
{code:java}
base = spark.someDataset(columns: ("a" String, "b" Int))
someUdf = someUdf(x: Int)
cached = base.select($"a", someUdf($"b")).cache()
cachedAgain = cached.select(...).cached(){code}
In this case I'd expect 'cachedAgain' to re-use the data from 'cached' instead of re-reading 'base'.

I'm attaching a small Scala code fragment with a more detailed reproduction. 

Interestingly; the whole process works if I replace 'Int' with 'java.lang.Integer' in the UDF parameter-list. The generated physical plans are:
{code:java}
--- using scala.Int ---
== Physical Plan ==
InMemoryTableScan [a#6, (b * 2)#23]
+- InMemoryRelation [a#6, (b * 2)#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [a#6, (if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) * 2) AS (b * 2)#23]
+- Scan ExistingRDD[a#6,b#7]

{code}
{code:java}
--- using java.lang.Integer ---
== Physical Plan ==
InMemoryTableScan [a#53, (b * 2)#70]
+- InMemoryRelation [a#53, (b * 2)#70], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [a#53, (b#57 * 2) AS (b * 2)#70]
+- InMemoryTableScan [a#53, b#57]
+- InMemoryRelation [a#53, b#57], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [a#53, UDF(b#54) AS b#57]
+- Scan ExistingRDD[a#53,b#54]{code}
The underlying issue seems to be that when `persist` is called; Spark will effectively execute this code:
{code:java}
spark.sessionState.executePlan(dataset.queryExecution.analyzed).analyzed{code}
(CacheManager line #100 and #166)

This seems to generate the null-check on nullable fields twice, which won't match with the already cached plan:
{code:java}
Plan cached:
Project [a#6, if (isnull(b#7)) null else UDF(b#7) AS b#10]
+- LogicalRDD [a#6, b#7], false


Plan that is used in the lookup:
Project [a#6, if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) AS b#10]
+- LogicalRDD [a#6, b#7], false{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org