You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "David Borsos (JIRA)" <ji...@apache.org> on 2018/10/11 10:23:00 UTC
[jira] [Created] (SPARK-25707) Repeated caching doesn't work when
using an UDF on a nullable field
David Borsos created SPARK-25707:
------------------------------------
Summary: Repeated caching doesn't work when using an UDF on a nullable field
Key: SPARK-25707
URL: https://issues.apache.org/jira/browse/SPARK-25707
Project: Spark
Issue Type: Bug
Components: Optimizer, Spark Core
Affects Versions: 2.3.1
Environment: Reproducible on local Spark executors
Reporter: David Borsos
Attachments: UdfCachingNullableBug.scala
It seems like Spark doesn't manage find already cached data and triggers a re-read from the source in case when all the following circumstances are present:
# Caching a dataset that is derived from another already cached dataset
# Having nullable fields in the Dataset of types that are mapped to primitives (Integer, Boolean, etc...)
# Using a UDF that takes non-nullable primitive parameters (Int, Boolean) on one of these nullable fields
In this case Spark's optimizer will create a null-check on these fields prior to passing them into the UDF's code (presumably to prevent a NullPointerException). The plans with the null-checks are generated when a Dataset is persisted and when the optimizer is looking for fragments of a query plan that can be read from cache instead of resolving the full Dataset lineage.
However; the plan fragments between the query that is currently being persisted and what is in the cache do not match - the lookup version will have the null-check included into the query plan twice; thus preventing the optimizer from generating a plan that relies on the cached data and ultimately triggering a repeated read from the original datasource.
The reproduction sequence in pseudocode:
{code:java}
base = spark.someDataset(columns: ("a" String, "b" Int))
someUdf = someUdf(x: Int)
cached = base.select($"a", someUdf($"b")).cache()
cachedAgain = cached.select(...).cached(){code}
In this case I'd expect 'cachedAgain' to re-use the data from 'cached' instead of re-reading 'base'.
I'm attaching a small Scala code fragment with a more detailed reproduction.
Interestingly; the whole process works if I replace 'Int' with 'java.lang.Integer' in the UDF parameter-list. The generated physical plans are:
{code:java}
--- using scala.Int ---
== Physical Plan ==
InMemoryTableScan [a#6, (b * 2)#23]
+- InMemoryRelation [a#6, (b * 2)#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [a#6, (if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) * 2) AS (b * 2)#23]
+- Scan ExistingRDD[a#6,b#7]
{code}
{code:java}
--- using java.lang.Integer ---
== Physical Plan ==
InMemoryTableScan [a#53, (b * 2)#70]
+- InMemoryRelation [a#53, (b * 2)#70], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [a#53, (b#57 * 2) AS (b * 2)#70]
+- InMemoryTableScan [a#53, b#57]
+- InMemoryRelation [a#53, b#57], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [a#53, UDF(b#54) AS b#57]
+- Scan ExistingRDD[a#53,b#54]{code}
The underlying issue seems to be that when `persist` is called; Spark will effectively execute this code:
{code:java}
spark.sessionState.executePlan(dataset.queryExecution.analyzed).analyzed{code}
(CacheManager line #100 and #166)
This seems to generate the null-check on nullable fields twice, which won't match with the already cached plan:
{code:java}
Plan cached:
Project [a#6, if (isnull(b#7)) null else UDF(b#7) AS b#10]
+- LogicalRDD [a#6, b#7], false
Plan that is used in the lookup:
Project [a#6, if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) AS b#10]
+- LogicalRDD [a#6, b#7], false{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org