You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "fqaiser94 (Jira)" <ji...@apache.org> on 2020/01/02 23:26:01 UTC

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

    [ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007116#comment-17007116 ] 

fqaiser94 edited comment on SPARK-22231 at 1/2/20 11:25 PM:
------------------------------------------------------------

Hi folks, I can personally affirm that this would be a valuable feature to have in Spark. Looking around, its clear to me that other people have a need for this feature as well e.g. SPARK-16483,  [Mailing list|http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Applying-transformation-on-a-struct-inside-an-array-td18934.html], etc. 

A few things have changed since the last comments in this discussion. Support for higher-order functions has been [added|https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html] to the Apache Spark project and in particular, there are now {{transform}} and {{filter}} functions available for operating on ArrayType columns. These would be the equivalent of the {{mapItems}} and {{filterItems}} functions that were previously proposed in this ticket. To complete this ticket then, I think all that is needed is adding the {{withField}}, {{withFieldRenamed}}, and {{drop}} methods to the Column class. Looking through the discussion, I can summarize the signatures for these new methods should be as follows: 
 - {{def withField(fieldName: String, field: Column): Column}}
 Returns a new StructType column with field added/replaced based on name.

 - {{def drop(fieldNames: String*)}}
 Returns a new StructType column with field dropped.

 - {{def withFieldRenamed(existingName: String, newName: String): Column}}
 Returns a new StructType column with field renamed.

Since it didn't seem like anybody was actively working on this, I went ahead and created a pull request to add a {{withField}} method to the {{Column}} class that conforms with the specs discussed in this ticket. You can review the PR here: [https://github.com/apache/spark/pull/27066]

As this is my first PR to the Apache Spark project, I wanted to keep the PR small. However, I wouldn't mind writing the {{drop}} and {{withFieldRenamed}} methods as well in separate PRs once the current PR is accepted. 


was (Author: fqaiser94):
Hi folks, I can personally affirm that this would be a valuable feature to have in Spark. Looking around, its clear to me that other people have a need for this feature as well e.g. SPARK-16483,  [Mailing list|http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Applying-transformation-on-a-struct-inside-an-array-td18934.html], etc. 

A few things have changed since the last comments in this discussion. Support for higher-order functions has been [added|https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html] to the Apache Spark project and in particular, there are now {{transform}} and {{filter}} functions available for operating on ArrayType columns. These would be the equivalent of the {{mapItems}} and {{filterItems}} functions that were previously proposed in this ticket. To complete this ticket then, I think all that is needed is adding the {{withField}}, {{withFieldRenamed}}, and {{drop}} methods to the Column class. Looking through the discussion, I can summarize the signatures for these new methods should be as follows: 
 - {{def withField(fieldName: String, field: Column): Column}}
 Returns a new StructType column with field added/replaced based on name.

 - {{def drop(colNames: String*)}}
 Returns a new StructType column with field dropped.

 - {{def withFieldRenamed(existingName: String, newName: String): Column}}
 Returns a new StructType column with field renamed.

Since it didn't seem like anybody was actively working on this, I went ahead and created a pull request to add a {{withField}} method to the {{Column}} class that conforms with the specs discussed in this ticket. You can review the PR here: [https://github.com/apache/spark/pull/27066]

As this is my first PR to the Apache Spark project, I wanted to keep the PR small. However, I wouldn't mind writing the {{drop}} and {{withFieldRenamed}} methods as well in separate PRs once the current PR is accepted. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-22231
>                 URL: https://issues.apache.org/jira/browse/SPARK-22231
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: DB Tsai
>            Assignee: Jeremy Smith
>            Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great content to fulfill the unique tastes of our members. Before building a recommendation algorithms, we need to prepare the training, testing, and validation datasets in Apache Spark. Due to the nature of ranking problems, we have a nested list of items to be ranked in one column, and the top level is the contexts describing the setting for where a model is to be used (e.g. profiles, country, time, device, etc.)  Here is a blog post describing the details, [Distributed Time Travel for Feature Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- title_id: integer (nullable = true)
>  |    |    |-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some functions on them. Sometimes, we're dropping or adding new columns in the nested list of structs. Currently, there is no easy solution in open source Apache Spark to perform those operations using SQL primitives; many people just convert the data into RDD to work on the nested level of data, and then reconstruct the new dataframe as workaround. This is extremely inefficient because all the optimizations like predicate pushdown in SQL can not be performed, we can not leverage on the columnar format, and the serialization and deserialization cost becomes really huge even we just want to add a new column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We plan to make it open source in Spark upstream. We would like to socialize the API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from *Column* to *Column*, and then apply the function on nested dataframe. Here is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: double (containsNull = true)
> result.show()
> // +---+----+--------------------+
> // |foo| bar|               items|
> // +---+----+--------------------+
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+----+--------------------+
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can add a new function, *withColumn* in *Column* to add or replace the existing column that has the same name in the nested list of struct. Here is two examples demonstrating the API together with *mapItems*; the first one replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> result.show(false)
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 |20.0|[[20,21.0], [21,22.0]]|
> // +---+----+----------------------+
> {code}
> and the second one adds a new column in the nested dataframe.
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "c")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> // |    |    |-- c: double (nullable = true)
> result.show(false)
> // +---+----+--------------------------------+
> // |foo|bar |items                           |
> // +---+----+--------------------------------+
> // |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]|
> // |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]|
> // +---+----+--------------------------------+
> {code}
> We also implement a filter predicate to nested list of struct, and it will return those items which matched the predicate. The following is the API example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.filterItems("items") {
>   item => item("a") < 20
> }
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,10.0], [11,11.0]]|
> // |20 |20.0|[]                    |
> // +---+----+----------------------+
> {code}
> Dropping a column in the nested list of struct can be achieved by similar API to *withColumn*. We add *drop* method to *Column* to implement this. Here is an example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.drop("b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> result.show(false)
> // +---+----+------------+
> // |foo|bar |items       |
> // +---+----+------------+
> // |10 |10.0|[[10], [11]]|
> // |20 |20.0|[[20], [21]]|
> // +---+----+------------+
> {code}
> Note that all of those APIs are implemented by SQL expression with codegen; as a result, those APIs are not opaque to Spark optimizers, and can fully take advantage of columnar data structure. 
> We're looking forward to the community feedback and suggestion! Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org