You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Everett Anderson <ev...@nuna.com.INVALID> on 2017/02/07 19:02:28 UTC

Un-exploding / denormalizing Spark SQL help

Hi,

I'm trying to un-explode or denormalize a table like

+---+----+-----+------+--------+
|id |name|extra|data  |priority|
+---+----+-----+------+--------+
|1  |Fred|8    |value1|1       |
|1  |Fred|8    |value8|2       |
|1  |Fred|8    |value5|3       |
|2  |Amy |9    |value3|1       |
|2  |Amy |9    |value5|2       |
+---+----+-----+------+--------+

into something that looks like

+---+----+------+------+---------+------+------+---------+------+------+---------+
|id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
|priority3|
+---+----+------+------+---------+------+------+---------+------+------+---------+
|1  |Fred|8     |value1|1        |8     |value8|2        |8     |value5|3
     |
|2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
 |null     |
+---+----+------+------+---------+------+------+---------+------+------+---------+

If I were going the other direction, I'd create a new column with an array
of structs, each with 'extra', 'data', and 'priority' fields and then
explode it.

Going from the more normalized view, though, I'm having a harder time.

I want to group or partition by (id, name) and order by priority, but after
that I can't figure out how to get multiple rows rotated into one.

Any ideas?

Here's the code to create the input table above:

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "Fred", 8, "value1", 1),
    Row(1, "Fred", 8, "value8", 2),
    Row(1, "Fred", 8, "value5", 3),
    Row(2, "Amy", 9, "value3", 1),
    Row(2, "Amy", 9, "value5", 2)))

val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("name", StringType, nullable = true),
    StructField("extra", IntegerType, nullable = true),
    StructField("data", StringType, nullable = true),
    StructField("priority", IntegerType, nullable = true)))

val data = sqlContext.createDataFrame(rowsRDD, schema)

Re: Un-exploding / denormalizing Spark SQL help

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
On Wed, Feb 8, 2017 at 1:14 PM, ayan guha <gu...@gmail.com> wrote:

> Will a sql solution will be acceptable?
>

I'm very curious to see how it'd be done in raw SQL if you're up for it!

I think the 2 programmatic solutions so far are viable, though, too.

(By the way, thanks everyone for the great suggestions!)





>
> On Thu, 9 Feb 2017 at 4:01 am, Xiaomeng Wan <sh...@gmail.com> wrote:
>
>> You could also try pivot.
>>
>> On 7 February 2017 at 16:13, Everett Anderson <ev...@nuna.com.invalid>
>> wrote:
>>
>>
>>
>> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mi...@databricks.com>
>> wrote:
>>
>> I think the fastest way is likely to use a combination of conditionals
>> (when / otherwise), first (ignoring nulls), while grouping by the id.
>> This should get the answer with only a single shuffle.
>>
>> Here is an example
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
>> .
>>
>>
>> Very cool! Using the simpler aggregates feels cleaner.
>>
>>
>>
>> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi Everett,
>>
>> That's pretty much what I'd do. Can't think of a way to beat your
>> solution. Why do you "feel vaguely uneasy about it"?
>>
>>
>> Maybe it felt like I was unnecessarily grouping-by twice, but probably
>> mostly that I hadn't used pivot before.
>>
>> Interestingly, the physical plans are not especially different between
>> these two solutions after the rank column is added. They both have two
>> SortAggregates that seem to be figuring out where to put results based on
>> the rank:
>>
>> My original one:
>>
>> == Physical Plan ==
>> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
>> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
>> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
>> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
>> +- SortAggregate(key=[id#279,name#280], functions=[first(if
>> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
>> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else
>> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312
>> else null, true)])
>>    +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
>> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
>> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
>> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
>> temp_struct#312 else null, true)])
>>       +- *Project [id#279, name#280, rank#292, struct(extra#281,
>> data#282, priority#283) AS temp_struct#312]
>>          +- Window [denserank(priority#283) windowspecdefinition(id#279,
>> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
>> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
>>             +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false,
>> 0
>>                +- Exchange hashpartitioning(id#279, name#280, 200)
>>                   +- Scan ExistingRDD[id#279,name#280,
>> extra#281,data#282,priority#283]
>>
>>
>> And modifying Michael's slightly to use a rank:
>>
>> import org.apache.spark.sql.functions._
>>
>> def getColumnWithRank(column: String, rank: Int) = {
>>   first(when(col("rank") === lit(rank), col(column)).otherwise(null),
>> ignoreNulls = true)
>> }
>>
>> val withRankColumn = data.withColumn("rank", functions.dense_rank().over(Window.partitionBy("id",
>> "name").orderBy("priority")))
>>
>> val modCollapsed = withRankColumn
>>   .groupBy($"id", $"name")
>>   .agg(
>>     getColumnWithRank("data", 1) as 'data1,
>>     getColumnWithRank("data", 2) as 'data2,
>>     getColumnWithRank("data", 3) as 'data3,
>>     getColumnWithRank("extra", 1) as 'extra1,
>>     getColumnWithRank("extra", 2) as 'extra2,
>>     getColumnWithRank("extra", 3) as 'extra3)
>>
>>
>> modCollapsed.explain
>>
>> == Physical Plan ==
>> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN
>> (rank#965 = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965
>> = 2) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN
>> data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281
>> ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE
>> null END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null
>> END, true)])
>> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE
>> WHEN (rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 3) THEN extra#281 ELSE null END, true)])
>>    +- *Project [id#279, name#280, extra#281, data#282, rank#965]
>>       +- Window [denserank(priority#283) windowspecdefinition(id#279,
>> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
>> ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
>>          +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>>             +- Exchange hashpartitioning(id#279, name#280, 200)
>>                +- Scan ExistingRDD[id#279,name#280,
>> extra#281,data#282,priority#283]
>>
>>
>>
>>
>> I'd also check out the execution plan (with explain) to see how it's
>> gonna work at runtime. I may have seen groupBy + join be better than
>> window (there were more exchanges in play for windows I reckon).
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ev...@nuna.com>
>> wrote:
>> >
>> >
>> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> >> help here too.
>> >
>> >
>> > This seems to work, but I do feel vaguely uneasy about it. :)
>> >
>> > // First add a 'rank' column which is priority order just in case
>> priorities
>> > aren't
>> > // from 1 with no gaps.
>> > val temp1 = data.withColumn("rank", functions.dense_rank()
>> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
>> >
>> > +---+----+-----+------+--------+----+
>> > | id|name|extra|  data|priority|rank|
>> > +---+----+-----+------+--------+----+
>> > |  1|Fred|    8|value1|       1|   1|
>> > |  1|Fred|    8|value8|       2|   2|
>> > |  1|Fred|    8|value5|       3|   3|
>> > |  2| Amy|    9|value3|       1|   1|
>> > |  2| Amy|    9|value5|       2|   2|
>> > +---+----+-----+------+--------+----+
>> >
>> > // Now move all the columns we want to denormalize into a struct column
>> to
>> > keep them together.
>> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
>> > temp1("data"), temp1("priority")))
>> >   .drop("extra", "data", "priority")
>> >
>> > +---+----+----+------------+
>> > | id|name|rank| temp_struct|
>> > +---+----+----+------------+
>> > |  1|Fred|   1|[8,value1,1]|
>> > |  1|Fred|   2|[8,value8,2]|
>> > |  1|Fred|   3|[8,value5,3]|
>> > |  2| Amy|   1|[9,value3,1]|
>> > |  2| Amy|   2|[9,value5,2]|
>> > +---+----+----+------------+
>> >
>> > // groupBy, again, but now pivot the rank column. We need an aggregate
>> > function after pivot,
>> > // so use first -- there will only ever be one element.
>> > val temp3 = temp2.groupBy("id", "name")
>> >   .pivot("rank", Seq("1", "2", "3"))
>> >   .agg(functions.first("temp_struct"))
>> >
>> > +---+----+------------+------------+------------+
>> > | id|name|           1|           2|           3|
>> > +---+----+------------+------------+------------+
>> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
>> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
>> > +---+----+------------+------------+------------+
>> >
>> > // Now just moving things out of the structs and clean up.
>> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>> >      .withColumn("data1", temp3("1").getField("data"))
>> >      .withColumn("priority1", temp3("1").getField("priority"))
>> >      .withColumn("extra2", temp3("2").getField("extra"))
>> >      .withColumn("data2", temp3("2").getField("data"))
>> >      .withColumn("priority2", temp3("2").getField("priority"))
>> >      .withColumn("extra3", temp3("3").getField("extra"))
>> >      .withColumn("data3", temp3("3").getField("data"))
>> >      .withColumn("priority3", temp3("3").getField("priority"))
>> >      .drop("1", "2", "3")
>> >
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
>> > data3|priority3|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
>> > 3|
>> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
>> > null|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >>
>> >>
>> >> Jacek
>> >>
>> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ev...@nuna.com.invalid>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm trying to un-explode or denormalize a table like
>> >>>
>> >>> +---+----+-----+------+--------+
>> >>> |id |name|extra|data  |priority|
>> >>> +---+----+-----+------+--------+
>> >>> |1  |Fred|8    |value1|1       |
>> >>> |1  |Fred|8    |value8|2       |
>> >>> |1  |Fred|8    |value5|3       |
>> >>> |2  |Amy |9    |value3|1       |
>> >>> |2  |Amy |9    |value5|2       |
>> >>> +---+----+-----+------+--------+
>> >>>
>> >>> into something that looks like
>> >>>
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> >>> |priority3|
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>>  |value5|3
>> >>> |
>> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>> >>> |null     |
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>>
>> >>> If I were going the other direction, I'd create a new column with an
>> >>> array of structs, each with 'extra', 'data', and 'priority' fields
>> and then
>> >>> explode it.
>> >>>
>> >>> Going from the more normalized view, though, I'm having a harder time.
>> >>>
>> >>> I want to group or partition by (id, name) and order by priority, but
>> >>> after that I can't figure out how to get multiple rows rotated into
>> one.
>> >>>
>> >>> Any ideas?
>> >>>
>> >>> Here's the code to create the input table above:
>> >>>
>> >>> import org.apache.spark.sql.Row
>> >>> import org.apache.spark.sql.Dataset
>> >>> import org.apache.spark.sql.types._
>> >>>
>> >>> val rowsRDD = sc.parallelize(Seq(
>> >>>     Row(1, "Fred", 8, "value1", 1),
>> >>>     Row(1, "Fred", 8, "value8", 2),
>> >>>     Row(1, "Fred", 8, "value5", 3),
>> >>>     Row(2, "Amy", 9, "value3", 1),
>> >>>     Row(2, "Amy", 9, "value5", 2)))
>> >>>
>> >>> val schema = StructType(Seq(
>> >>>     StructField("id", IntegerType, nullable = true),
>> >>>     StructField("name", StringType, nullable = true),
>> >>>     StructField("extra", IntegerType, nullable = true),
>> >>>     StructField("data", StringType, nullable = true),
>> >>>     StructField("priority", IntegerType, nullable = true)))
>> >>>
>> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>> >>>
>> >>>
>> >>>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>>
>>
>> --
> Best Regards,
> Ayan Guha
>

Re: Un-exploding / denormalizing Spark SQL help

Posted by ayan guha <gu...@gmail.com>.
Will a sql solution will be acceptable?
On Thu, 9 Feb 2017 at 4:01 am, Xiaomeng Wan <sh...@gmail.com> wrote:

> You could also try pivot.
>
> On 7 February 2017 at 16:13, Everett Anderson <ev...@nuna.com.invalid>
> wrote:
>
>
>
> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
> I think the fastest way is likely to use a combination of conditionals
> (when / otherwise), first (ignoring nulls), while grouping by the id.
> This should get the answer with only a single shuffle.
>
> Here is an example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
> .
>
>
> Very cool! Using the simpler aggregates feels cleaner.
>
>
>
> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
> Hi Everett,
>
> That's pretty much what I'd do. Can't think of a way to beat your
> solution. Why do you "feel vaguely uneasy about it"?
>
>
> Maybe it felt like I was unnecessarily grouping-by twice, but probably
> mostly that I hadn't used pivot before.
>
> Interestingly, the physical plans are not especially different between
> these two solutions after the rank column is added. They both have two
> SortAggregates that seem to be figuring out where to put results based on
> the rank:
>
> My original one:
>
> == Physical Plan ==
> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
> +- SortAggregate(key=[id#279,name#280], functions=[first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else
> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312
> else null, true)])
>    +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
> temp_struct#312 else null, true)])
>       +- *Project [id#279, name#280, rank#292, struct(extra#281, data#282,
> priority#283) AS temp_struct#312]
>          +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
>             +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>                +- Exchange hashpartitioning(id#279, name#280, 200)
>                   +- Scan
> ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]
>
>
> And modifying Michael's slightly to use a rank:
>
> import org.apache.spark.sql.functions._
>
> def getColumnWithRank(column: String, rank: Int) = {
>   first(when(col("rank") === lit(rank), col(column)).otherwise(null),
> ignoreNulls = true)
> }
>
> val withRankColumn = data.withColumn("rank",
> functions.dense_rank().over(Window.partitionBy("id",
> "name").orderBy("priority")))
>
> val modCollapsed = withRankColumn
>   .groupBy($"id", $"name")
>   .agg(
>     getColumnWithRank("data", 1) as 'data1,
>     getColumnWithRank("data", 2) as 'data2,
>     getColumnWithRank("data", 3) as 'data3,
>     getColumnWithRank("extra", 1) as 'extra1,
>     getColumnWithRank("extra", 2) as 'extra2,
>     getColumnWithRank("extra", 3) as 'extra3)
>
>
> modCollapsed.explain
>
> == Physical Plan ==
> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN (rank#965
> = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN
> data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN data#282
> ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 ELSE
> null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE null
> END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null END,
> true)])
> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE WHEN
> (rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
> (rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
> (rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
> (rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
> (rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
> (rank#965 = 3) THEN extra#281 ELSE null END, true)])
>    +- *Project [id#279, name#280, extra#281, data#282, rank#965]
>       +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
>          +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>             +- Exchange hashpartitioning(id#279, name#280, 200)
>                +- Scan
> ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]
>
>
>
>
> I'd also check out the execution plan (with explain) to see how it's
> gonna work at runtime. I may have seen groupBy + join be better than
> window (there were more exchanges in play for windows I reckon).
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ev...@nuna.com>
> wrote:
> >
> >
> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
> wrote:
> >>
> >> Hi,
> >>
> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
> >> help here too.
> >
> >
> > This seems to work, but I do feel vaguely uneasy about it. :)
> >
> > // First add a 'rank' column which is priority order just in case
> priorities
> > aren't
> > // from 1 with no gaps.
> > val temp1 = data.withColumn("rank", functions.dense_rank()
> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
> >
> > +---+----+-----+------+--------+----+
> > | id|name|extra|  data|priority|rank|
> > +---+----+-----+------+--------+----+
> > |  1|Fred|    8|value1|       1|   1|
> > |  1|Fred|    8|value8|       2|   2|
> > |  1|Fred|    8|value5|       3|   3|
> > |  2| Amy|    9|value3|       1|   1|
> > |  2| Amy|    9|value5|       2|   2|
> > +---+----+-----+------+--------+----+
> >
> > // Now move all the columns we want to denormalize into a struct column
> to
> > keep them together.
> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> > temp1("data"), temp1("priority")))
> >   .drop("extra", "data", "priority")
> >
> > +---+----+----+------------+
> > | id|name|rank| temp_struct|
> > +---+----+----+------------+
> > |  1|Fred|   1|[8,value1,1]|
> > |  1|Fred|   2|[8,value8,2]|
> > |  1|Fred|   3|[8,value5,3]|
> > |  2| Amy|   1|[9,value3,1]|
> > |  2| Amy|   2|[9,value5,2]|
> > +---+----+----+------------+
> >
> > // groupBy, again, but now pivot the rank column. We need an aggregate
> > function after pivot,
> > // so use first -- there will only ever be one element.
> > val temp3 = temp2.groupBy("id", "name")
> >   .pivot("rank", Seq("1", "2", "3"))
> >   .agg(functions.first("temp_struct"))
> >
> > +---+----+------------+------------+------------+
> > | id|name|           1|           2|           3|
> > +---+----+------------+------------+------------+
> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
> > +---+----+------------+------------+------------+
> >
> > // Now just moving things out of the structs and clean up.
> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
> >      .withColumn("data1", temp3("1").getField("data"))
> >      .withColumn("priority1", temp3("1").getField("priority"))
> >      .withColumn("extra2", temp3("2").getField("extra"))
> >      .withColumn("data2", temp3("2").getField("data"))
> >      .withColumn("priority2", temp3("2").getField("priority"))
> >      .withColumn("extra3", temp3("3").getField("extra"))
> >      .withColumn("data3", temp3("3").getField("data"))
> >      .withColumn("priority3", temp3("3").getField("priority"))
> >      .drop("1", "2", "3")
> >
> >
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> > data3|priority3|
> >
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> > |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
> > 3|
> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
> > null|
> >
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> >
> >
> >
> >
> >
> >
> >
> >>
> >>
> >> Jacek
> >>
> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ev...@nuna.com.invalid>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm trying to un-explode or denormalize a table like
> >>>
> >>> +---+----+-----+------+--------+
> >>> |id |name|extra|data  |priority|
> >>> +---+----+-----+------+--------+
> >>> |1  |Fred|8    |value1|1       |
> >>> |1  |Fred|8    |value8|2       |
> >>> |1  |Fred|8    |value5|3       |
> >>> |2  |Amy |9    |value3|1       |
> >>> |2  |Amy |9    |value5|2       |
> >>> +---+----+-----+------+--------+
> >>>
> >>> into something that looks like
> >>>
> >>>
> >>>
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
> >>> |priority3|
> >>>
> >>>
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>  |value5|3
> >>> |
> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
> >>> |null     |
> >>>
> >>>
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> >>>
> >>> If I were going the other direction, I'd create a new column with an
> >>> array of structs, each with 'extra', 'data', and 'priority' fields and
> then
> >>> explode it.
> >>>
> >>> Going from the more normalized view, though, I'm having a harder time.
> >>>
> >>> I want to group or partition by (id, name) and order by priority, but
> >>> after that I can't figure out how to get multiple rows rotated into
> one.
> >>>
> >>> Any ideas?
> >>>
> >>> Here's the code to create the input table above:
> >>>
> >>> import org.apache.spark.sql.Row
> >>> import org.apache.spark.sql.Dataset
> >>> import org.apache.spark.sql.types._
> >>>
> >>> val rowsRDD = sc.parallelize(Seq(
> >>>     Row(1, "Fred", 8, "value1", 1),
> >>>     Row(1, "Fred", 8, "value8", 2),
> >>>     Row(1, "Fred", 8, "value5", 3),
> >>>     Row(2, "Amy", 9, "value3", 1),
> >>>     Row(2, "Amy", 9, "value5", 2)))
> >>>
> >>> val schema = StructType(Seq(
> >>>     StructField("id", IntegerType, nullable = true),
> >>>     StructField("name", StringType, nullable = true),
> >>>     StructField("extra", IntegerType, nullable = true),
> >>>     StructField("data", StringType, nullable = true),
> >>>     StructField("priority", IntegerType, nullable = true)))
> >>>
> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
> >>>
> >>>
> >>>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>
>
>
> --
Best Regards,
Ayan Guha

Re: Un-exploding / denormalizing Spark SQL help

Posted by Xiaomeng Wan <sh...@gmail.com>.
You could also try pivot.

On 7 February 2017 at 16:13, Everett Anderson <ev...@nuna.com.invalid>
wrote:

>
>
> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> I think the fastest way is likely to use a combination of conditionals
>> (when / otherwise), first (ignoring nulls), while grouping by the id.
>> This should get the answer with only a single shuffle.
>>
>> Here is an example
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
>> .
>>
>
> Very cool! Using the simpler aggregates feels cleaner.
>
>
>>
>> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi Everett,
>>>
>>> That's pretty much what I'd do. Can't think of a way to beat your
>>> solution. Why do you "feel vaguely uneasy about it"?
>>>
>>
> Maybe it felt like I was unnecessarily grouping-by twice, but probably
> mostly that I hadn't used pivot before.
>
> Interestingly, the physical plans are not especially different between
> these two solutions after the rank column is added. They both have two
> SortAggregates that seem to be figuring out where to put results based on
> the rank:
>
> My original one:
>
> == Physical Plan ==
> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
> +- SortAggregate(key=[id#279,name#280], functions=[first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else
> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312
> else null, true)])
>    +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
> temp_struct#312 else null, true)])
>       +- *Project [id#279, name#280, rank#292, struct(extra#281, data#282,
> priority#283) AS temp_struct#312]
>          +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
>             +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>                +- Exchange hashpartitioning(id#279, name#280, 200)
>                   +- Scan ExistingRDD[id#279,name#280,
> extra#281,data#282,priority#283]
>
>
> And modifying Michael's slightly to use a rank:
>
> import org.apache.spark.sql.functions._
>
> def getColumnWithRank(column: String, rank: Int) = {
>   first(when(col("rank") === lit(rank), col(column)).otherwise(null),
> ignoreNulls = true)
> }
>
> val withRankColumn = data.withColumn("rank", functions.dense_rank().over(Window.partitionBy("id",
> "name").orderBy("priority")))
>
> val modCollapsed = withRankColumn
>   .groupBy($"id", $"name")
>   .agg(
>     getColumnWithRank("data", 1) as 'data1,
>     getColumnWithRank("data", 2) as 'data2,
>     getColumnWithRank("data", 3) as 'data3,
>     getColumnWithRank("extra", 1) as 'extra1,
>     getColumnWithRank("extra", 2) as 'extra2,
>     getColumnWithRank("extra", 3) as 'extra3)
>
>
> modCollapsed.explain
>
> == Physical Plan ==
> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN (rank#965
> = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN
> data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN data#282
> ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 ELSE
> null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE null
> END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null END,
> true)])
> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE
> WHEN (rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE
> WHEN (rank#965 = 3) THEN extra#281 ELSE null END, true)])
>    +- *Project [id#279, name#280, extra#281, data#282, rank#965]
>       +- Window [denserank(priority#283) windowspecdefinition(id#279,
> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
> ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
>          +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>             +- Exchange hashpartitioning(id#279, name#280, 200)
>                +- Scan ExistingRDD[id#279,name#280,
> extra#281,data#282,priority#283]
>
>
>
>>
>>> I'd also check out the execution plan (with explain) to see how it's
>>> gonna work at runtime. I may have seen groupBy + join be better than
>>> window (there were more exchanges in play for windows I reckon).
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ev...@nuna.com>
>>> wrote:
>>> >
>>> >
>>> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> Could groupBy and withColumn or UDAF work perhaps? I think window
>>> could
>>> >> help here too.
>>> >
>>> >
>>> > This seems to work, but I do feel vaguely uneasy about it. :)
>>> >
>>> > // First add a 'rank' column which is priority order just in case
>>> priorities
>>> > aren't
>>> > // from 1 with no gaps.
>>> > val temp1 = data.withColumn("rank", functions.dense_rank()
>>> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
>>> >
>>> > +---+----+-----+------+--------+----+
>>> > | id|name|extra|  data|priority|rank|
>>> > +---+----+-----+------+--------+----+
>>> > |  1|Fred|    8|value1|       1|   1|
>>> > |  1|Fred|    8|value8|       2|   2|
>>> > |  1|Fred|    8|value5|       3|   3|
>>> > |  2| Amy|    9|value3|       1|   1|
>>> > |  2| Amy|    9|value5|       2|   2|
>>> > +---+----+-----+------+--------+----+
>>> >
>>> > // Now move all the columns we want to denormalize into a struct
>>> column to
>>> > keep them together.
>>> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
>>> > temp1("data"), temp1("priority")))
>>> >   .drop("extra", "data", "priority")
>>> >
>>> > +---+----+----+------------+
>>> > | id|name|rank| temp_struct|
>>> > +---+----+----+------------+
>>> > |  1|Fred|   1|[8,value1,1]|
>>> > |  1|Fred|   2|[8,value8,2]|
>>> > |  1|Fred|   3|[8,value5,3]|
>>> > |  2| Amy|   1|[9,value3,1]|
>>> > |  2| Amy|   2|[9,value5,2]|
>>> > +---+----+----+------------+
>>> >
>>> > // groupBy, again, but now pivot the rank column. We need an aggregate
>>> > function after pivot,
>>> > // so use first -- there will only ever be one element.
>>> > val temp3 = temp2.groupBy("id", "name")
>>> >   .pivot("rank", Seq("1", "2", "3"))
>>> >   .agg(functions.first("temp_struct"))
>>> >
>>> > +---+----+------------+------------+------------+
>>> > | id|name|           1|           2|           3|
>>> > +---+----+------------+------------+------------+
>>> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
>>> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
>>> > +---+----+------------+------------+------------+
>>> >
>>> > // Now just moving things out of the structs and clean up.
>>> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>>> >      .withColumn("data1", temp3("1").getField("data"))
>>> >      .withColumn("priority1", temp3("1").getField("priority"))
>>> >      .withColumn("extra2", temp3("2").getField("extra"))
>>> >      .withColumn("data2", temp3("2").getField("data"))
>>> >      .withColumn("priority2", temp3("2").getField("priority"))
>>> >      .withColumn("extra3", temp3("3").getField("extra"))
>>> >      .withColumn("data3", temp3("3").getField("data"))
>>> >      .withColumn("priority3", temp3("3").getField("priority"))
>>> >      .drop("1", "2", "3")
>>> >
>>> > +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
>>> > data3|priority3|
>>> > +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> > |  1|Fred|     8|value1|        1|     8|value8|        2|
>>>  8|value5|
>>> > 3|
>>> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|
>>> null|
>>> > null|
>>> > +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >>
>>> >>
>>> >> Jacek
>>> >>
>>> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <everett@nuna.com.invalid
>>> >
>>> >> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I'm trying to un-explode or denormalize a table like
>>> >>>
>>> >>> +---+----+-----+------+--------+
>>> >>> |id |name|extra|data  |priority|
>>> >>> +---+----+-----+------+--------+
>>> >>> |1  |Fred|8    |value1|1       |
>>> >>> |1  |Fred|8    |value8|2       |
>>> >>> |1  |Fred|8    |value5|3       |
>>> >>> |2  |Amy |9    |value3|1       |
>>> >>> |2  |Amy |9    |value5|2       |
>>> >>> +---+----+-----+------+--------+
>>> >>>
>>> >>> into something that looks like
>>> >>>
>>> >>>
>>> >>> +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >>> |id |name|extra1|data1 |priority1|extra2|data2
>>> |priority2|extra3|data3
>>> >>> |priority3|
>>> >>>
>>> >>> +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>>>  |value5|3
>>> >>> |
>>> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>>> >>> |null     |
>>> >>>
>>> >>> +---+----+------+------+---------+------+------+---------+--
>>> ----+------+---------+
>>> >>>
>>> >>> If I were going the other direction, I'd create a new column with an
>>> >>> array of structs, each with 'extra', 'data', and 'priority' fields
>>> and then
>>> >>> explode it.
>>> >>>
>>> >>> Going from the more normalized view, though, I'm having a harder
>>> time.
>>> >>>
>>> >>> I want to group or partition by (id, name) and order by priority, but
>>> >>> after that I can't figure out how to get multiple rows rotated into
>>> one.
>>> >>>
>>> >>> Any ideas?
>>> >>>
>>> >>> Here's the code to create the input table above:
>>> >>>
>>> >>> import org.apache.spark.sql.Row
>>> >>> import org.apache.spark.sql.Dataset
>>> >>> import org.apache.spark.sql.types._
>>> >>>
>>> >>> val rowsRDD = sc.parallelize(Seq(
>>> >>>     Row(1, "Fred", 8, "value1", 1),
>>> >>>     Row(1, "Fred", 8, "value8", 2),
>>> >>>     Row(1, "Fred", 8, "value5", 3),
>>> >>>     Row(2, "Amy", 9, "value3", 1),
>>> >>>     Row(2, "Amy", 9, "value5", 2)))
>>> >>>
>>> >>> val schema = StructType(Seq(
>>> >>>     StructField("id", IntegerType, nullable = true),
>>> >>>     StructField("name", StringType, nullable = true),
>>> >>>     StructField("extra", IntegerType, nullable = true),
>>> >>>     StructField("data", StringType, nullable = true),
>>> >>>     StructField("priority", IntegerType, nullable = true)))
>>> >>>
>>> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>> >>>
>>> >>>
>>> >>>
>>> >
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>
>>
>

Re: Un-exploding / denormalizing Spark SQL help

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> I think the fastest way is likely to use a combination of conditionals
> (when / otherwise), first (ignoring nulls), while grouping by the id.
> This should get the answer with only a single shuffle.
>
> Here is an example
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
> .
>

Very cool! Using the simpler aggregates feels cleaner.


>
> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi Everett,
>>
>> That's pretty much what I'd do. Can't think of a way to beat your
>> solution. Why do you "feel vaguely uneasy about it"?
>>
>
Maybe it felt like I was unnecessarily grouping-by twice, but probably
mostly that I hadn't used pivot before.

Interestingly, the physical plans are not especially different between
these two solutions after the rank column is added. They both have two
SortAggregates that seem to be figuring out where to put results based on
the rank:

My original one:

== Physical Plan ==
*Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
+- SortAggregate(key=[id#279,name#280], functions=[first(if ((cast(rank#292
as double) = 1.0)) temp_struct#312 else null, true),first(if
((cast(rank#292 as double) = 2.0)) temp_struct#312 else null,
true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312 else
null, true)])
   +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
temp_struct#312 else null, true)])
      +- *Project [id#279, name#280, rank#292, struct(extra#281, data#282,
priority#283) AS temp_struct#312]
         +- Window [denserank(priority#283) windowspecdefinition(id#279,
name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
            +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
               +- Exchange hashpartitioning(id#279, name#280, 200)
                  +- Scan
ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]


And modifying Michael's slightly to use a rank:

import org.apache.spark.sql.functions._

def getColumnWithRank(column: String, rank: Int) = {
  first(when(col("rank") === lit(rank), col(column)).otherwise(null),
ignoreNulls = true)
}

val withRankColumn = data.withColumn("rank",
functions.dense_rank().over(Window.partitionBy("id",
"name").orderBy("priority")))

val modCollapsed = withRankColumn
  .groupBy($"id", $"name")
  .agg(
    getColumnWithRank("data", 1) as 'data1,
    getColumnWithRank("data", 2) as 'data2,
    getColumnWithRank("data", 3) as 'data3,
    getColumnWithRank("extra", 1) as 'extra1,
    getColumnWithRank("extra", 2) as 'extra2,
    getColumnWithRank("extra", 3) as 'extra3)


modCollapsed.explain

== Physical Plan ==
SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN (rank#965 =
1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN
data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN data#282
ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281 ELSE
null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE null
END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null END,
true)])
+- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE WHEN
(rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE WHEN
(rank#965 = 3) THEN extra#281 ELSE null END, true)])
   +- *Project [id#279, name#280, extra#281, data#282, rank#965]
      +- Window [denserank(priority#283) windowspecdefinition(id#279,
name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
         +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
            +- Exchange hashpartitioning(id#279, name#280, 200)
               +- Scan
ExistingRDD[id#279,name#280,extra#281,data#282,priority#283]



>
>> I'd also check out the execution plan (with explain) to see how it's
>> gonna work at runtime. I may have seen groupBy + join be better than
>> window (there were more exchanges in play for windows I reckon).
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ev...@nuna.com>
>> wrote:
>> >
>> >
>> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> >> help here too.
>> >
>> >
>> > This seems to work, but I do feel vaguely uneasy about it. :)
>> >
>> > // First add a 'rank' column which is priority order just in case
>> priorities
>> > aren't
>> > // from 1 with no gaps.
>> > val temp1 = data.withColumn("rank", functions.dense_rank()
>> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
>> >
>> > +---+----+-----+------+--------+----+
>> > | id|name|extra|  data|priority|rank|
>> > +---+----+-----+------+--------+----+
>> > |  1|Fred|    8|value1|       1|   1|
>> > |  1|Fred|    8|value8|       2|   2|
>> > |  1|Fred|    8|value5|       3|   3|
>> > |  2| Amy|    9|value3|       1|   1|
>> > |  2| Amy|    9|value5|       2|   2|
>> > +---+----+-----+------+--------+----+
>> >
>> > // Now move all the columns we want to denormalize into a struct column
>> to
>> > keep them together.
>> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
>> > temp1("data"), temp1("priority")))
>> >   .drop("extra", "data", "priority")
>> >
>> > +---+----+----+------------+
>> > | id|name|rank| temp_struct|
>> > +---+----+----+------------+
>> > |  1|Fred|   1|[8,value1,1]|
>> > |  1|Fred|   2|[8,value8,2]|
>> > |  1|Fred|   3|[8,value5,3]|
>> > |  2| Amy|   1|[9,value3,1]|
>> > |  2| Amy|   2|[9,value5,2]|
>> > +---+----+----+------------+
>> >
>> > // groupBy, again, but now pivot the rank column. We need an aggregate
>> > function after pivot,
>> > // so use first -- there will only ever be one element.
>> > val temp3 = temp2.groupBy("id", "name")
>> >   .pivot("rank", Seq("1", "2", "3"))
>> >   .agg(functions.first("temp_struct"))
>> >
>> > +---+----+------------+------------+------------+
>> > | id|name|           1|           2|           3|
>> > +---+----+------------+------------+------------+
>> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
>> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
>> > +---+----+------------+------------+------------+
>> >
>> > // Now just moving things out of the structs and clean up.
>> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>> >      .withColumn("data1", temp3("1").getField("data"))
>> >      .withColumn("priority1", temp3("1").getField("priority"))
>> >      .withColumn("extra2", temp3("2").getField("extra"))
>> >      .withColumn("data2", temp3("2").getField("data"))
>> >      .withColumn("priority2", temp3("2").getField("priority"))
>> >      .withColumn("extra3", temp3("3").getField("extra"))
>> >      .withColumn("data3", temp3("3").getField("data"))
>> >      .withColumn("priority3", temp3("3").getField("priority"))
>> >      .drop("1", "2", "3")
>> >
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
>> > data3|priority3|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
>> > 3|
>> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
>> > null|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >>
>> >>
>> >> Jacek
>> >>
>> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ev...@nuna.com.invalid>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm trying to un-explode or denormalize a table like
>> >>>
>> >>> +---+----+-----+------+--------+
>> >>> |id |name|extra|data  |priority|
>> >>> +---+----+-----+------+--------+
>> >>> |1  |Fred|8    |value1|1       |
>> >>> |1  |Fred|8    |value8|2       |
>> >>> |1  |Fred|8    |value5|3       |
>> >>> |2  |Amy |9    |value3|1       |
>> >>> |2  |Amy |9    |value5|2       |
>> >>> +---+----+-----+------+--------+
>> >>>
>> >>> into something that looks like
>> >>>
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> >>> |priority3|
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>>  |value5|3
>> >>> |
>> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>> >>> |null     |
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>>
>> >>> If I were going the other direction, I'd create a new column with an
>> >>> array of structs, each with 'extra', 'data', and 'priority' fields
>> and then
>> >>> explode it.
>> >>>
>> >>> Going from the more normalized view, though, I'm having a harder time.
>> >>>
>> >>> I want to group or partition by (id, name) and order by priority, but
>> >>> after that I can't figure out how to get multiple rows rotated into
>> one.
>> >>>
>> >>> Any ideas?
>> >>>
>> >>> Here's the code to create the input table above:
>> >>>
>> >>> import org.apache.spark.sql.Row
>> >>> import org.apache.spark.sql.Dataset
>> >>> import org.apache.spark.sql.types._
>> >>>
>> >>> val rowsRDD = sc.parallelize(Seq(
>> >>>     Row(1, "Fred", 8, "value1", 1),
>> >>>     Row(1, "Fred", 8, "value8", 2),
>> >>>     Row(1, "Fred", 8, "value5", 3),
>> >>>     Row(2, "Amy", 9, "value3", 1),
>> >>>     Row(2, "Amy", 9, "value5", 2)))
>> >>>
>> >>> val schema = StructType(Seq(
>> >>>     StructField("id", IntegerType, nullable = true),
>> >>>     StructField("name", StringType, nullable = true),
>> >>>     StructField("extra", IntegerType, nullable = true),
>> >>>     StructField("data", StringType, nullable = true),
>> >>>     StructField("priority", IntegerType, nullable = true)))
>> >>>
>> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>> >>>
>> >>>
>> >>>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>

Re: Un-exploding / denormalizing Spark SQL help

Posted by Michael Armbrust <mi...@databricks.com>.
I think the fastest way is likely to use a combination of conditionals
(when / otherwise), first (ignoring nulls), while grouping by the id.  This
should get the answer with only a single shuffle.

Here is an example
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
.

On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Everett,
>
> That's pretty much what I'd do. Can't think of a way to beat your
> solution. Why do you "feel vaguely uneasy about it"?
>
> I'd also check out the execution plan (with explain) to see how it's
> gonna work at runtime. I may have seen groupBy + join be better than
> window (there were more exchanges in play for windows I reckon).
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ev...@nuna.com>
> wrote:
> >
> >
> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
> wrote:
> >>
> >> Hi,
> >>
> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
> >> help here too.
> >
> >
> > This seems to work, but I do feel vaguely uneasy about it. :)
> >
> > // First add a 'rank' column which is priority order just in case
> priorities
> > aren't
> > // from 1 with no gaps.
> > val temp1 = data.withColumn("rank", functions.dense_rank()
> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
> >
> > +---+----+-----+------+--------+----+
> > | id|name|extra|  data|priority|rank|
> > +---+----+-----+------+--------+----+
> > |  1|Fred|    8|value1|       1|   1|
> > |  1|Fred|    8|value8|       2|   2|
> > |  1|Fred|    8|value5|       3|   3|
> > |  2| Amy|    9|value3|       1|   1|
> > |  2| Amy|    9|value5|       2|   2|
> > +---+----+-----+------+--------+----+
> >
> > // Now move all the columns we want to denormalize into a struct column
> to
> > keep them together.
> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> > temp1("data"), temp1("priority")))
> >   .drop("extra", "data", "priority")
> >
> > +---+----+----+------------+
> > | id|name|rank| temp_struct|
> > +---+----+----+------------+
> > |  1|Fred|   1|[8,value1,1]|
> > |  1|Fred|   2|[8,value8,2]|
> > |  1|Fred|   3|[8,value5,3]|
> > |  2| Amy|   1|[9,value3,1]|
> > |  2| Amy|   2|[9,value5,2]|
> > +---+----+----+------------+
> >
> > // groupBy, again, but now pivot the rank column. We need an aggregate
> > function after pivot,
> > // so use first -- there will only ever be one element.
> > val temp3 = temp2.groupBy("id", "name")
> >   .pivot("rank", Seq("1", "2", "3"))
> >   .agg(functions.first("temp_struct"))
> >
> > +---+----+------------+------------+------------+
> > | id|name|           1|           2|           3|
> > +---+----+------------+------------+------------+
> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
> > +---+----+------------+------------+------------+
> >
> > // Now just moving things out of the structs and clean up.
> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
> >      .withColumn("data1", temp3("1").getField("data"))
> >      .withColumn("priority1", temp3("1").getField("priority"))
> >      .withColumn("extra2", temp3("2").getField("extra"))
> >      .withColumn("data2", temp3("2").getField("data"))
> >      .withColumn("priority2", temp3("2").getField("priority"))
> >      .withColumn("extra3", temp3("3").getField("extra"))
> >      .withColumn("data3", temp3("3").getField("data"))
> >      .withColumn("priority3", temp3("3").getField("priority"))
> >      .drop("1", "2", "3")
> >
> > +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> > data3|priority3|
> > +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> > |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
> > 3|
> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
> > null|
> > +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >
> >
> >
> >
> >
> >
> >
> >>
> >>
> >> Jacek
> >>
> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ev...@nuna.com.invalid>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm trying to un-explode or denormalize a table like
> >>>
> >>> +---+----+-----+------+--------+
> >>> |id |name|extra|data  |priority|
> >>> +---+----+-----+------+--------+
> >>> |1  |Fred|8    |value1|1       |
> >>> |1  |Fred|8    |value8|2       |
> >>> |1  |Fred|8    |value5|3       |
> >>> |2  |Amy |9    |value3|1       |
> >>> |2  |Amy |9    |value5|2       |
> >>> +---+----+-----+------+--------+
> >>>
> >>> into something that looks like
> >>>
> >>>
> >>> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
> >>> |priority3|
> >>>
> >>> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>  |value5|3
> >>> |
> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
> >>> |null     |
> >>>
> >>> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> >>>
> >>> If I were going the other direction, I'd create a new column with an
> >>> array of structs, each with 'extra', 'data', and 'priority' fields and
> then
> >>> explode it.
> >>>
> >>> Going from the more normalized view, though, I'm having a harder time.
> >>>
> >>> I want to group or partition by (id, name) and order by priority, but
> >>> after that I can't figure out how to get multiple rows rotated into
> one.
> >>>
> >>> Any ideas?
> >>>
> >>> Here's the code to create the input table above:
> >>>
> >>> import org.apache.spark.sql.Row
> >>> import org.apache.spark.sql.Dataset
> >>> import org.apache.spark.sql.types._
> >>>
> >>> val rowsRDD = sc.parallelize(Seq(
> >>>     Row(1, "Fred", 8, "value1", 1),
> >>>     Row(1, "Fred", 8, "value8", 2),
> >>>     Row(1, "Fred", 8, "value5", 3),
> >>>     Row(2, "Amy", 9, "value3", 1),
> >>>     Row(2, "Amy", 9, "value5", 2)))
> >>>
> >>> val schema = StructType(Seq(
> >>>     StructField("id", IntegerType, nullable = true),
> >>>     StructField("name", StringType, nullable = true),
> >>>     StructField("extra", IntegerType, nullable = true),
> >>>     StructField("data", StringType, nullable = true),
> >>>     StructField("priority", IntegerType, nullable = true)))
> >>>
> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
> >>>
> >>>
> >>>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: Un-exploding / denormalizing Spark SQL help

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi Everett,

That's pretty much what I'd do. Can't think of a way to beat your
solution. Why do you "feel vaguely uneasy about it"?

I'd also check out the execution plan (with explain) to see how it's
gonna work at runtime. I may have seen groupBy + join be better than
window (there were more exchanges in play for windows I reckon).

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ev...@nuna.com> wrote:
>
>
> On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi,
>>
>> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> help here too.
>
>
> This seems to work, but I do feel vaguely uneasy about it. :)
>
> // First add a 'rank' column which is priority order just in case priorities
> aren't
> // from 1 with no gaps.
> val temp1 = data.withColumn("rank", functions.dense_rank()
>    .over(Window.partitionBy("id", "name").orderBy("priority")))
>
> +---+----+-----+------+--------+----+
> | id|name|extra|  data|priority|rank|
> +---+----+-----+------+--------+----+
> |  1|Fred|    8|value1|       1|   1|
> |  1|Fred|    8|value8|       2|   2|
> |  1|Fred|    8|value5|       3|   3|
> |  2| Amy|    9|value3|       1|   1|
> |  2| Amy|    9|value5|       2|   2|
> +---+----+-----+------+--------+----+
>
> // Now move all the columns we want to denormalize into a struct column to
> keep them together.
> val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> temp1("data"), temp1("priority")))
>   .drop("extra", "data", "priority")
>
> +---+----+----+------------+
> | id|name|rank| temp_struct|
> +---+----+----+------------+
> |  1|Fred|   1|[8,value1,1]|
> |  1|Fred|   2|[8,value8,2]|
> |  1|Fred|   3|[8,value5,3]|
> |  2| Amy|   1|[9,value3,1]|
> |  2| Amy|   2|[9,value5,2]|
> +---+----+----+------------+
>
> // groupBy, again, but now pivot the rank column. We need an aggregate
> function after pivot,
> // so use first -- there will only ever be one element.
> val temp3 = temp2.groupBy("id", "name")
>   .pivot("rank", Seq("1", "2", "3"))
>   .agg(functions.first("temp_struct"))
>
> +---+----+------------+------------+------------+
> | id|name|           1|           2|           3|
> +---+----+------------+------------+------------+
> |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
> +---+----+------------+------------+------------+
>
> // Now just moving things out of the structs and clean up.
> val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>      .withColumn("data1", temp3("1").getField("data"))
>      .withColumn("priority1", temp3("1").getField("priority"))
>      .withColumn("extra2", temp3("2").getField("extra"))
>      .withColumn("data2", temp3("2").getField("data"))
>      .withColumn("priority2", temp3("2").getField("priority"))
>      .withColumn("extra3", temp3("3").getField("extra"))
>      .withColumn("data3", temp3("3").getField("data"))
>      .withColumn("priority3", temp3("3").getField("priority"))
>      .drop("1", "2", "3")
>
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> data3|priority3|
> +---+----+------+------+---------+------+------+---------+------+------+---------+
> |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
> 3|
> |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
> null|
> +---+----+------+------+---------+------+------+---------+------+------+---------+
>
>
>
>
>
>
>
>>
>>
>> Jacek
>>
>> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ev...@nuna.com.invalid>
>> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to un-explode or denormalize a table like
>>>
>>> +---+----+-----+------+--------+
>>> |id |name|extra|data  |priority|
>>> +---+----+-----+------+--------+
>>> |1  |Fred|8    |value1|1       |
>>> |1  |Fred|8    |value8|2       |
>>> |1  |Fred|8    |value5|3       |
>>> |2  |Amy |9    |value3|1       |
>>> |2  |Amy |9    |value5|2       |
>>> +---+----+-----+------+--------+
>>>
>>> into something that looks like
>>>
>>>
>>> +---+----+------+------+---------+------+------+---------+------+------+---------+
>>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>>> |priority3|
>>>
>>> +---+----+------+------+---------+------+------+---------+------+------+---------+
>>> |1  |Fred|8     |value1|1        |8     |value8|2        |8     |value5|3
>>> |
>>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>>> |null     |
>>>
>>> +---+----+------+------+---------+------+------+---------+------+------+---------+
>>>
>>> If I were going the other direction, I'd create a new column with an
>>> array of structs, each with 'extra', 'data', and 'priority' fields and then
>>> explode it.
>>>
>>> Going from the more normalized view, though, I'm having a harder time.
>>>
>>> I want to group or partition by (id, name) and order by priority, but
>>> after that I can't figure out how to get multiple rows rotated into one.
>>>
>>> Any ideas?
>>>
>>> Here's the code to create the input table above:
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>>     Row(1, "Fred", 8, "value1", 1),
>>>     Row(1, "Fred", 8, "value8", 2),
>>>     Row(1, "Fred", 8, "value5", 3),
>>>     Row(2, "Amy", 9, "value3", 1),
>>>     Row(2, "Amy", 9, "value5", 2)))
>>>
>>> val schema = StructType(Seq(
>>>     StructField("id", IntegerType, nullable = true),
>>>     StructField("name", StringType, nullable = true),
>>>     StructField("extra", IntegerType, nullable = true),
>>>     StructField("data", StringType, nullable = true),
>>>     StructField("priority", IntegerType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>>
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Un-exploding / denormalizing Spark SQL help

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Could groupBy and withColumn or UDAF work perhaps? I think window could
> help here too.
>

This seems to work, but I do feel vaguely uneasy about it. :)

// First add a 'rank' column which is priority order just in case
priorities aren't
// from 1 with no gaps.
val temp1 = data.withColumn("rank", functions.dense_rank()
   .over(Window.partitionBy("id", "name").orderBy("priority")))

+---+----+-----+------+--------+----+
| id|name|extra|  data|priority|rank|
+---+----+-----+------+--------+----+
|  1|Fred|    8|value1|       1|   1|
|  1|Fred|    8|value8|       2|   2|
|  1|Fred|    8|value5|       3|   3|
|  2| Amy|    9|value3|       1|   1|
|  2| Amy|    9|value5|       2|   2|
+---+----+-----+------+--------+----+

// Now move all the columns we want to denormalize into a struct column to
keep them together.
val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
temp1("data"), temp1("priority")))
  .drop("extra", "data", "priority")

+---+----+----+------------+
| id|name|rank| temp_struct|
+---+----+----+------------+
|  1|Fred|   1|[8,value1,1]|
|  1|Fred|   2|[8,value8,2]|
|  1|Fred|   3|[8,value5,3]|
|  2| Amy|   1|[9,value3,1]|
|  2| Amy|   2|[9,value5,2]|
+---+----+----+------------+

// groupBy, again, but now pivot the rank column. We need an aggregate
function after pivot,
// so use first -- there will only ever be one element.
val temp3 = temp2.groupBy("id", "name")
  .pivot("rank", Seq("1", "2", "3"))
  .agg(functions.first("temp_struct"))

+---+----+------------+------------+------------+
| id|name|           1|           2|           3|
+---+----+------------+------------+------------+
|  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
|  2| Amy|[9,value3,1]|[9,value5,2]|        null|
+---+----+------------+------------+------------+

// Now just moving things out of the structs and clean up.
val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
     .withColumn("data1", temp3("1").getField("data"))
     .withColumn("priority1", temp3("1").getField("priority"))
     .withColumn("extra2", temp3("2").getField("extra"))
     .withColumn("data2", temp3("2").getField("data"))
     .withColumn("priority2", temp3("2").getField("priority"))
     .withColumn("extra3", temp3("3").getField("extra"))
     .withColumn("data3", temp3("3").getField("data"))
     .withColumn("priority3", temp3("3").getField("priority"))
     .drop("1", "2", "3")

+---+----+------+------+---------+------+------+---------+------+------+---------+
| id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
data3|priority3|
+---+----+------+------+---------+------+------+---------+------+------+---------+
|  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
     3|
|  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
  null|
+---+----+------+------+---------+------+------+---------+------+------+---------+








>
> Jacek
>
> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ev...@nuna.com.invalid>
> wrote:
>
>> Hi,
>>
>> I'm trying to un-explode or denormalize a table like
>>
>> +---+----+-----+------+--------+
>> |id |name|extra|data  |priority|
>> +---+----+-----+------+--------+
>> |1  |Fred|8    |value1|1       |
>> |1  |Fred|8    |value8|2       |
>> |1  |Fred|8    |value5|3       |
>> |2  |Amy |9    |value3|1       |
>> |2  |Amy |9    |value5|2       |
>> +---+----+-----+------+--------+
>>
>> into something that looks like
>>
>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> |priority3|
>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> |1  |Fred|8     |value1|1        |8     |value8|2        |8     |value5|3
>>        |
>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>>  |null     |
>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>>
>> If I were going the other direction, I'd create a new column with an
>> array of structs, each with 'extra', 'data', and 'priority' fields and then
>> explode it.
>>
>> Going from the more normalized view, though, I'm having a harder time.
>>
>> I want to group or partition by (id, name) and order by priority, but
>> after that I can't figure out how to get multiple rows rotated into one.
>>
>> Any ideas?
>>
>> Here's the code to create the input table above:
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>>     Row(1, "Fred", 8, "value1", 1),
>>     Row(1, "Fred", 8, "value8", 2),
>>     Row(1, "Fred", 8, "value5", 3),
>>     Row(2, "Amy", 9, "value3", 1),
>>     Row(2, "Amy", 9, "value5", 2)))
>>
>> val schema = StructType(Seq(
>>     StructField("id", IntegerType, nullable = true),
>>     StructField("name", StringType, nullable = true),
>>     StructField("extra", IntegerType, nullable = true),
>>     StructField("data", StringType, nullable = true),
>>     StructField("priority", IntegerType, nullable = true)))
>>
>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>
>>
>>
>>

Re: Un-exploding / denormalizing Spark SQL help

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

Could groupBy and withColumn or UDAF work perhaps? I think window could
help here too.

Jacek

On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ev...@nuna.com.invalid>
wrote:

> Hi,
>
> I'm trying to un-explode or denormalize a table like
>
> +---+----+-----+------+--------+
> |id |name|extra|data  |priority|
> +---+----+-----+------+--------+
> |1  |Fred|8    |value1|1       |
> |1  |Fred|8    |value8|2       |
> |1  |Fred|8    |value5|3       |
> |2  |Amy |9    |value3|1       |
> |2  |Amy |9    |value5|2       |
> +---+----+-----+------+--------+
>
> into something that looks like
>
> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
> |priority3|
> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
> |1  |Fred|8     |value1|1        |8     |value8|2        |8     |value5|3
>        |
> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>  |null     |
> +---+----+------+------+---------+------+------+---------+--
> ----+------+---------+
>
> If I were going the other direction, I'd create a new column with an array
> of structs, each with 'extra', 'data', and 'priority' fields and then
> explode it.
>
> Going from the more normalized view, though, I'm having a harder time.
>
> I want to group or partition by (id, name) and order by priority, but
> after that I can't figure out how to get multiple rows rotated into one.
>
> Any ideas?
>
> Here's the code to create the input table above:
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
>     Row(1, "Fred", 8, "value1", 1),
>     Row(1, "Fred", 8, "value8", 2),
>     Row(1, "Fred", 8, "value5", 3),
>     Row(2, "Amy", 9, "value3", 1),
>     Row(2, "Amy", 9, "value5", 2)))
>
> val schema = StructType(Seq(
>     StructField("id", IntegerType, nullable = true),
>     StructField("name", StringType, nullable = true),
>     StructField("extra", IntegerType, nullable = true),
>     StructField("data", StringType, nullable = true),
>     StructField("priority", IntegerType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
>
>
>