You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2016/06/06 18:52:21 UTC

[jira] [Assigned] (SPARK-15632) Dataset typed filter operation changes query plan schema

     [ https://issues.apache.org/jira/browse/SPARK-15632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-15632:
------------------------------------

    Assignee: Apache Spark  (was: Xiang Zhong)

> Dataset typed filter operation changes query plan schema
> --------------------------------------------------------
>
>                 Key: SPARK-15632
>                 URL: https://issues.apache.org/jira/browse/SPARK-15632
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Cheng Lian
>            Assignee: Apache Spark
>
> h1. Overview
> Filter operations should never change query plan schema. However, Dataset typed filter operation does introduce schema change in some cases. Furthermore, all the following aspects of the schema may be changed:
> # field order,
> # field number,
> # field data type,
> # field name, and
> # field nullability
> This is mostly because we wrap the actual {{Filter}} operator with a {{SerializeFromObject}}/{{DeserializeToObject}} pair (query plan fragment illustrated as following), which performs a bunch of magic tricks.
> {noformat}
> SerializeFromObject
>  Filter
>   DeserializeToObject
>    <child-plan>
> {noformat}
> h1. Reproduction
> h2. Field order, field number, and field data type change
> {code}
> case class A(b: Double, a: String)
> val data = Seq(
>   "{ 'a': 'foo', 'b': 1, 'c': 'extra' }",
>   "{ 'a': 'bar', 'b': 2, 'c': 'extra' }",
>   "{ 'a': 'bar', 'c': 'extra' }"
> )
> val df1 = spark.read.json(sc.parallelize(data))
> df1.printSchema()
> // root
> //  |-- a: string (nullable = true)
> //  |-- b: long (nullable = true)
> //  |-- c: string (nullable = true)
> val ds1 = df1.as[A]
> ds1.printSchema()
> // root
> //  |-- a: string (nullable = true)
> //  |-- b: long (nullable = true)
> //  |-- c: string (nullable = true)
> val ds2 = ds1.filter(_.b > 1)    // <- Here comes the trouble maker
> ds2.printSchema()
> // root                             <- 1. Reordered `a` and `b`, and
> //  |-- b: double (nullable = true)    2. dropped `c`, and
> //  |-- a: string (nullable = true)    3. up-casted `b` from long to double
> val df2 = ds2.toDF()
> df2.printSchema()
> // root                             <- (Same as above)
> //  |-- b: double (nullable = true)
> //  |-- a: string (nullable = true)
> {code}
> h3. Field order change
> {{DeserializeToObject}} resolves the encoder deserializer expression by *name*. Thus field order in input query plan doesn't matter.
> h3. Field number change
> Same as above, fields not referred by the encoder are silently dropped while resolving deserializer expressions by name.
> h3. Field data type change
> When generating deserializer expressions, we allows "sane" implicit coercions (e.g. integer to long, and long to double) by inserting {{UpCast}} operators. Thus actual field data types in input query plan don't matter either as long as there are valid implicit coercions.
> h2. Field name and nullability change
> {code}
> val ds3 = spark.range(10)
> ds3.printSchema()
> // root
> //  |-- id: long (nullable = false)
> val ds4 = ds3.filter(_ > 3)
> ds4.printSchema()
> // root
> //  |-- value: long (nullable = true)  4. Name changed from `id` to `value`, and
> //                                     5. nullability changed from false to true
> {code}
> h3. Field name change
> Primitive encoders like {{Encoder\[Long\]}} doesn't have a named field, thus they always has only a single field with hard-coded name "value". On the other hand, when serializing domain objects back to rows, schema of {{SerializeFromObject}} is solely determined by the encoder. Thus the original name "id" becomes "value".
> h3. Nullability change
> [PR #11880|https://github.com/apache/spark/pull/11880] updated return type of {{SparkSession.range}} from {{Dataset\[Long\]}} to {{Dataset\[java.lang.Long\]}} due to [SI-4388|https://issues.scala-lang.org/browse/SI-4388]. As a consequence, although the underlying {{Range}} operator produces non-nullable output, the result encoder is nullable since {{java.lang.Long}} is nullable. Thus, we observe nullability change after typed filtering because serializer expression is derived from encoder rather than the query plan.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org