You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mike Wheeler <ro...@gmail.com> on 2017/05/01 04:12:09 UTC

Schema Evolution for nested Dataset[T]

Hi Spark Users,

Suppose I have some data (stored in parquet for example) generated as below:

package com.company.entity.old
case class Course(id: Int, students: List[Student])
case class Student(name: String)

Then usually I can access the data by

spark.read.parquet("data.parquet").as[Course]

Now I want to add a new field `address` to Student:

package com.company.entity.new
case class Course(id: Int, students: List[Student])
case class Student(name: String, address: String)

Then obviously running `spark.read.parquet("data.parquet").as[Course]`
on data generated by the old entity/schema will fail because `address`
is missing.

In this case, what is the best practice to read data generated with
the old entity/schema to the new entity/schema, with the missing field
set to some default value? I know I can manually write a function to
do the transformation from the old to the new. But it is kind of
tedious. Any automatic methods?

Thanks,

Mike

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Schema Evolution for nested Dataset[T]

Posted by Michael Armbrust <mi...@databricks.com>.
Unfortunately there is not an easy way to add nested columns (though I do
think we should implement the API you attempted to use).

You'll have to build the struct manually.

allData.withColumn("student", struct($"student.name",
coalesce($"student.age", lit(0)) as 'age)

You could automate the construction of this column by looking at the schema
if you want to get fancy.

On Tue, May 2, 2017 at 11:44 AM, Mike Wheeler <ro...@gmail.com>
wrote:

> Hi Michael,
>
> Thank you for the suggestions. I am wondering how I can make `withColumn`
> to handle nested structure?
>
> For example, below is my code to generate the data. I basically add the
> `age` field to `Person2`, which is nested in an Array for Course2. Then I
> want to fill in 0 for age with age is null.
>
> case class Person1(name: String)
> case class Person2(name: String, age: Int)
> case class Course1(id: Int, students: Array[Person1])
> case class Course2(id: Int, students: Array[Person2])
> Seq(Course1(10, Array(Person1("a"), Person1("b")))).toDF.write.
> parquet("data1")
> Seq(Course2(20, Array(Person2("c",20), Person2("d",10)))).toDF.write.
> parquet("data2")
> val allData = spark.read.option("mergeSchema", "true").parquet("data1",
> "data2")
> allData.show
>
> +---+--------------------+
> | id|            students|
> +---+--------------------+
> | 20|    [[c,20], [d,10]]|
> | 10|[[a,null], [b,null]]|
> +---+--------------------+
>
>
>
> *My first try:*
>
> allData.withColumn("students.age", coalesce($"students.age", lit(0)))
>
> It returns the exception:
>
> org.apache.spark.sql.AnalysisException: cannot resolve
> 'coalesce(`students`.`age`, 0)' due to data type mismatch: input to
> function coalesce should all be the same type, but it's [array<int>, int];;
>
>
>
> *My second try: *
>
> allData.withColumn("students.age", coalesce($"students.age",
> array(lit(0), lit(0)))).show
>
>
> +---+--------------------+------------+
> | id|            students|students.age|
> +---+--------------------+------------+
> | 20|    [[c,20], [d,10]]|    [20, 10]|
> | 10|[[a,null], [b,null]]|[null, null]|
> +---+--------------------+------------+
>
> It creates a new column "students.age" instead of imputing the value age
> nested in students.
>
> Thank you very much in advance.
>
> Mike
>
>
>
>
> On Mon, May 1, 2017 at 10:31 AM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Oh, and if you want a default other than null:
>>
>> import org.apache.spark.sql.functions._
>> df.withColumn("address", coalesce($"address", lit(<default>))
>>
>> On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust <michael@databricks.com
>> > wrote:
>>
>>> The following should work:
>>>
>>> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
>>> spark.read.schema(schema).parquet("data.parquet").as[Course]
>>>
>>> Note this will only work for nullable files (i.e. if you add a primitive
>>> like Int you need to make it an Option[Int])
>>>
>>> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler <
>>> rotationsymmetry14@gmail.com> wrote:
>>>
>>>> Hi Spark Users,
>>>>
>>>> Suppose I have some data (stored in parquet for example) generated as
>>>> below:
>>>>
>>>> package com.company.entity.old
>>>> case class Course(id: Int, students: List[Student])
>>>> case class Student(name: String)
>>>>
>>>> Then usually I can access the data by
>>>>
>>>> spark.read.parquet("data.parquet").as[Course]
>>>>
>>>> Now I want to add a new field `address` to Student:
>>>>
>>>> package com.company.entity.new
>>>> case class Course(id: Int, students: List[Student])
>>>> case class Student(name: String, address: String)
>>>>
>>>> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
>>>> on data generated by the old entity/schema will fail because `address`
>>>> is missing.
>>>>
>>>> In this case, what is the best practice to read data generated with
>>>> the old entity/schema to the new entity/schema, with the missing field
>>>> set to some default value? I know I can manually write a function to
>>>> do the transformation from the old to the new. But it is kind of
>>>> tedious. Any automatic methods?
>>>>
>>>> Thanks,
>>>>
>>>> Mike
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: Schema Evolution for nested Dataset[T]

Posted by Mike Wheeler <ro...@gmail.com>.
Hi Michael,

Thank you for the suggestions. I am wondering how I can make `withColumn`
to handle nested structure?

For example, below is my code to generate the data. I basically add the
`age` field to `Person2`, which is nested in an Array for Course2. Then I
want to fill in 0 for age with age is null.

case class Person1(name: String)
case class Person2(name: String, age: Int)
case class Course1(id: Int, students: Array[Person1])
case class Course2(id: Int, students: Array[Person2])
Seq(Course1(10, Array(Person1("a"),
Person1("b")))).toDF.write.parquet("data1")
Seq(Course2(20, Array(Person2("c",20),
Person2("d",10)))).toDF.write.parquet("data2")
val allData = spark.read.option("mergeSchema", "true").parquet("data1",
"data2")
allData.show

+---+--------------------+
| id|            students|
+---+--------------------+
| 20|    [[c,20], [d,10]]|
| 10|[[a,null], [b,null]]|
+---+--------------------+



*My first try:*

allData.withColumn("students.age", coalesce($"students.age", lit(0)))

It returns the exception:

org.apache.spark.sql.AnalysisException: cannot resolve
'coalesce(`students`.`age`, 0)' due to data type mismatch: input to
function coalesce should all be the same type, but it's [array<int>, int];;



*My second try: *

allData.withColumn("students.age", coalesce($"students.age", array(lit(0),
lit(0)))).show


+---+--------------------+------------+
| id|            students|students.age|
+---+--------------------+------------+
| 20|    [[c,20], [d,10]]|    [20, 10]|
| 10|[[a,null], [b,null]]|[null, null]|
+---+--------------------+------------+

It creates a new column "students.age" instead of imputing the value age
nested in students.

Thank you very much in advance.

Mike




On Mon, May 1, 2017 at 10:31 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> Oh, and if you want a default other than null:
>
> import org.apache.spark.sql.functions._
> df.withColumn("address", coalesce($"address", lit(<default>))
>
> On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> The following should work:
>>
>> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
>> spark.read.schema(schema).parquet("data.parquet").as[Course]
>>
>> Note this will only work for nullable files (i.e. if you add a primitive
>> like Int you need to make it an Option[Int])
>>
>> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler <
>> rotationsymmetry14@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>
>>> Suppose I have some data (stored in parquet for example) generated as
>>> below:
>>>
>>> package com.company.entity.old
>>> case class Course(id: Int, students: List[Student])
>>> case class Student(name: String)
>>>
>>> Then usually I can access the data by
>>>
>>> spark.read.parquet("data.parquet").as[Course]
>>>
>>> Now I want to add a new field `address` to Student:
>>>
>>> package com.company.entity.new
>>> case class Course(id: Int, students: List[Student])
>>> case class Student(name: String, address: String)
>>>
>>> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
>>> on data generated by the old entity/schema will fail because `address`
>>> is missing.
>>>
>>> In this case, what is the best practice to read data generated with
>>> the old entity/schema to the new entity/schema, with the missing field
>>> set to some default value? I know I can manually write a function to
>>> do the transformation from the old to the new. But it is kind of
>>> tedious. Any automatic methods?
>>>
>>> Thanks,
>>>
>>> Mike
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>
>>
>

Re: Schema Evolution for nested Dataset[T]

Posted by Michael Armbrust <mi...@databricks.com>.
Oh, and if you want a default other than null:

import org.apache.spark.sql.functions._
df.withColumn("address", coalesce($"address", lit(<default>))

On Mon, May 1, 2017 at 10:29 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> The following should work:
>
> val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
> spark.read.schema(schema).parquet("data.parquet").as[Course]
>
> Note this will only work for nullable files (i.e. if you add a primitive
> like Int you need to make it an Option[Int])
>
> On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler <
> rotationsymmetry14@gmail.com> wrote:
>
>> Hi Spark Users,
>>
>> Suppose I have some data (stored in parquet for example) generated as
>> below:
>>
>> package com.company.entity.old
>> case class Course(id: Int, students: List[Student])
>> case class Student(name: String)
>>
>> Then usually I can access the data by
>>
>> spark.read.parquet("data.parquet").as[Course]
>>
>> Now I want to add a new field `address` to Student:
>>
>> package com.company.entity.new
>> case class Course(id: Int, students: List[Student])
>> case class Student(name: String, address: String)
>>
>> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
>> on data generated by the old entity/schema will fail because `address`
>> is missing.
>>
>> In this case, what is the best practice to read data generated with
>> the old entity/schema to the new entity/schema, with the missing field
>> set to some default value? I know I can manually write a function to
>> do the transformation from the old to the new. But it is kind of
>> tedious. Any automatic methods?
>>
>> Thanks,
>>
>> Mike
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>

Re: Schema Evolution for nested Dataset[T]

Posted by Michael Armbrust <mi...@databricks.com>.
The following should work:

val schema = implicitly[org.apache.spark.sql.Encoder[Course]].schema
spark.read.schema(schema).parquet("data.parquet").as[Course]

Note this will only work for nullable files (i.e. if you add a primitive
like Int you need to make it an Option[Int])

On Sun, Apr 30, 2017 at 9:12 PM, Mike Wheeler <ro...@gmail.com>
wrote:

> Hi Spark Users,
>
> Suppose I have some data (stored in parquet for example) generated as
> below:
>
> package com.company.entity.old
> case class Course(id: Int, students: List[Student])
> case class Student(name: String)
>
> Then usually I can access the data by
>
> spark.read.parquet("data.parquet").as[Course]
>
> Now I want to add a new field `address` to Student:
>
> package com.company.entity.new
> case class Course(id: Int, students: List[Student])
> case class Student(name: String, address: String)
>
> Then obviously running `spark.read.parquet("data.parquet").as[Course]`
> on data generated by the old entity/schema will fail because `address`
> is missing.
>
> In this case, what is the best practice to read data generated with
> the old entity/schema to the new entity/schema, with the missing field
> set to some default value? I know I can manually write a function to
> do the transformation from the old to the new. But it is kind of
> tedious. Any automatic methods?
>
> Thanks,
>
> Mike
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>