You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Isabelle Phan <nl...@gmail.com> on 2015/10/21 02:23:08 UTC

How to distinguish columns when joining DataFrames with shared parent?

Hello,

When joining 2 DataFrames which originate from the same initial DataFrame,
why can't org.apache.spark.sql.DataFrame.apply(colName: String) method
distinguish which column to read?

Let me illustrate this question with a simple example (ran on Spark 1.5.1):

//my initial DataFrame
scala> df
res39: org.apache.spark.sql.DataFrame = [key: int, value: int]

scala> df.show
+---+-----+
|key|value|
+---+-----+
|  1|    1|
|  1|   10|
|  2|    3|
|  3|   20|
|  3|    5|
|  4|   10|
+---+-----+


//2 children DataFrames
scala> val smallValues = df.filter('value < 10)
smallValues: org.apache.spark.sql.DataFrame = [key: int, value: int]

scala> smallValues.show
+---+-----+
|key|value|
+---+-----+
|  1|    1|
|  2|    3|
|  3|    5|
+---+-----+


scala> val largeValues = df.filter('value >= 10)
largeValues: org.apache.spark.sql.DataFrame = [key: int, value: int]

scala> largeValues.show
+---+-----+
|key|value|
+---+-----+
|  1|   10|
|  3|   20|
|  4|   10|
+---+-----+


//Joining the children
scala> smallValues
  .join(largeValues, smallValues("key") === largeValues("key"))
  .withColumn("diff", smallValues("value") - largeValues("value"))
  .show
15/10/20 16:59:59 WARN Column: Constructing trivially true equals
predicate, 'key#41 = key#41'. Perhaps you need to use aliases.
+---+-----+---+-----+----+
|key|value|key|value|diff|
+---+-----+---+-----+----+
|  1|    1|  1|   10|   0|
|  3|    5|  3|   20|   0|
+---+-----+---+-----+----+


This last command issued a warning, but still executed the join correctly
(rows with key 2 and 4 don't appear in result set). However, the "diff"
column is incorrect.

Is this a bug or am I missing something here?


Thanks a lot for any input,

Isabelle

Re: How to distinguish columns when joining DataFrames with shared parent?

Posted by Xiao Li <ga...@gmail.com>.
Actually, I found a design issue in self joins. When we have multiple-layer
projections above alias, the information of alias relation between alias
and actual columns are lost. Thus, when resolving the alias in self joins,
the rules treat the alias (e.g., in Projection) as normal columns. This
only happens when using dataFrames. When using sql, the duplicate names
after self join will stop another self join.

We need a mechanism to trace back the original/actual column for each
alias, like what RDBMS optimizers are doing. The most efficient way is to
directly store the alias-information in the node to indicate if this is
from alias; otherwise, we need to traverse the underlying tree for each
column to confirm it is not from alias even if it is not from an alias

Good luck,

Xiao Li

2015-10-21 16:33 GMT-07:00 Isabelle Phan <nl...@gmail.com>:

> Ok, got it.
> Thanks a lot Michael for the detailed reply!
> On Oct 21, 2015 1:54 PM, "Michael Armbrust" <mi...@databricks.com>
> wrote:
>
>> Yeah, I was suggesting that you avoid using  org.apache.spark.sql.DataFrame.apply(colName:
>> String) when you are working with selfjoins as it eagerly binds to a
>> specific column in a what that breaks when we do the rewrite of one side of
>> the query.  Using the apply method constructs a resolved column eagerly
>> (which looses the alias information).
>>
>> On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan <nl...@gmail.com> wrote:
>>
>>> Thanks Michael and Ali for the reply!
>>>
>>> I'll make sure to use unresolved columns when working with self joins
>>> then.
>>>
>>> As pointed by Ali, isn't there still an issue with the aliasing? It
>>> works when using org.apache.spark.sql.functions.col(colName: String)
>>> method, but not when using org.apache.spark.sql.DataFrame.apply(colName:
>>> String):
>>>
>>> scala> j.select(col("lv.value")).show
>>> +-----+
>>> |value|
>>> +-----+
>>> |   10|
>>> |   20|
>>> +-----+
>>>
>>>
>>> scala> j.select(largeValues("lv.value")).show
>>> +-----+
>>> |value|
>>> +-----+
>>> |    1|
>>> |    5|
>>> +-----+
>>>
>>> Or does this behavior have the same root cause as detailed in Michael's
>>> email?
>>>
>>>
>>> -Isabelle
>>>
>>>
>>>
>>>
>>> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust <
>>> michael@databricks.com> wrote:
>>>
>>>> Unfortunately, the mechanisms that we use to differentiate columns
>>>> automatically don't work particularly well in the presence of self joins.
>>>> However, you can get it work if you use the $"column" syntax
>>>> consistently:
>>>>
>>>> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = df.filter('value >= 10).as("lv")
>>>> ​
>>>> smallValues
>>>>   .join(largeValues, $"sv.key" === $"lv.key")
>>>>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), $"lv.value".as("large_value"))
>>>>   .withColumn("diff", $"small_value" - $"large_value")
>>>>   .show()
>>>> +---+-----------+-----------+----+|key|small_value|large_value|diff|+---+-----------+-----------+----+|  1|          1|         10|  -9||  3|          5|         20| -15|+---+-----------+-----------+----+
>>>>
>>>>
>>>> The problem with the other cases is that calling
>>>> smallValues("columnName") or largeValues("columnName") is eagerly
>>>> resolving the attribute to the same column (since the data is actually
>>>> coming from the same place).  By the time we realize that you are joining
>>>> the data with itself (at which point we rewrite one side of the join to use
>>>> different expression ids) its too late.  At the core the problem is that in
>>>> Scala we have no easy way to differentiate largeValues("columnName")
>>>> from smallValues("columnName").  This is because the data is coming
>>>> from the same DataFrame and we don't actually know which variable name you
>>>> are using.  There are things we can change here, but its pretty hard to
>>>> change the semantics without breaking other use cases.
>>>>
>>>> So, this isn't a straight forward "bug", but its definitely a usability
>>>> issue.  For now, my advice would be: only use unresolved columns (i.e.
>>>> $"[alias.]column" or col("[alias.]column")) when working with self
>>>> joins.
>>>>
>>>> Michael
>>>>
>>>
>>>
>>

Re: How to distinguish columns when joining DataFrames with shared parent?

Posted by Isabelle Phan <nl...@gmail.com>.
Ok, got it.
Thanks a lot Michael for the detailed reply!
On Oct 21, 2015 1:54 PM, "Michael Armbrust" <mi...@databricks.com> wrote:

> Yeah, I was suggesting that you avoid using  org.apache.spark.sql.DataFrame.apply(colName:
> String) when you are working with selfjoins as it eagerly binds to a
> specific column in a what that breaks when we do the rewrite of one side of
> the query.  Using the apply method constructs a resolved column eagerly
> (which looses the alias information).
>
> On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan <nl...@gmail.com> wrote:
>
>> Thanks Michael and Ali for the reply!
>>
>> I'll make sure to use unresolved columns when working with self joins
>> then.
>>
>> As pointed by Ali, isn't there still an issue with the aliasing? It works
>> when using org.apache.spark.sql.functions.col(colName: String) method, but
>> not when using org.apache.spark.sql.DataFrame.apply(colName: String):
>>
>> scala> j.select(col("lv.value")).show
>> +-----+
>> |value|
>> +-----+
>> |   10|
>> |   20|
>> +-----+
>>
>>
>> scala> j.select(largeValues("lv.value")).show
>> +-----+
>> |value|
>> +-----+
>> |    1|
>> |    5|
>> +-----+
>>
>> Or does this behavior have the same root cause as detailed in Michael's
>> email?
>>
>>
>> -Isabelle
>>
>>
>>
>>
>> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust <
>> michael@databricks.com> wrote:
>>
>>> Unfortunately, the mechanisms that we use to differentiate columns
>>> automatically don't work particularly well in the presence of self joins.
>>> However, you can get it work if you use the $"column" syntax
>>> consistently:
>>>
>>> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = df.filter('value >= 10).as("lv")
>>> ​
>>> smallValues
>>>   .join(largeValues, $"sv.key" === $"lv.key")
>>>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), $"lv.value".as("large_value"))
>>>   .withColumn("diff", $"small_value" - $"large_value")
>>>   .show()
>>> +---+-----------+-----------+----+|key|small_value|large_value|diff|+---+-----------+-----------+----+|  1|          1|         10|  -9||  3|          5|         20| -15|+---+-----------+-----------+----+
>>>
>>>
>>> The problem with the other cases is that calling
>>> smallValues("columnName") or largeValues("columnName") is eagerly
>>> resolving the attribute to the same column (since the data is actually
>>> coming from the same place).  By the time we realize that you are joining
>>> the data with itself (at which point we rewrite one side of the join to use
>>> different expression ids) its too late.  At the core the problem is that in
>>> Scala we have no easy way to differentiate largeValues("columnName")
>>> from smallValues("columnName").  This is because the data is coming
>>> from the same DataFrame and we don't actually know which variable name you
>>> are using.  There are things we can change here, but its pretty hard to
>>> change the semantics without breaking other use cases.
>>>
>>> So, this isn't a straight forward "bug", but its definitely a usability
>>> issue.  For now, my advice would be: only use unresolved columns (i.e.
>>> $"[alias.]column" or col("[alias.]column")) when working with self
>>> joins.
>>>
>>> Michael
>>>
>>
>>
>

Re: How to distinguish columns when joining DataFrames with shared parent?

Posted by Michael Armbrust <mi...@databricks.com>.
Yeah, I was suggesting that you avoid using
org.apache.spark.sql.DataFrame.apply(colName:
String) when you are working with selfjoins as it eagerly binds to a
specific column in a what that breaks when we do the rewrite of one side of
the query.  Using the apply method constructs a resolved column eagerly
(which looses the alias information).

On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan <nl...@gmail.com> wrote:

> Thanks Michael and Ali for the reply!
>
> I'll make sure to use unresolved columns when working with self joins then.
>
> As pointed by Ali, isn't there still an issue with the aliasing? It works
> when using org.apache.spark.sql.functions.col(colName: String) method, but
> not when using org.apache.spark.sql.DataFrame.apply(colName: String):
>
> scala> j.select(col("lv.value")).show
> +-----+
> |value|
> +-----+
> |   10|
> |   20|
> +-----+
>
>
> scala> j.select(largeValues("lv.value")).show
> +-----+
> |value|
> +-----+
> |    1|
> |    5|
> +-----+
>
> Or does this behavior have the same root cause as detailed in Michael's
> email?
>
>
> -Isabelle
>
>
>
>
> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust <michael@databricks.com
> > wrote:
>
>> Unfortunately, the mechanisms that we use to differentiate columns
>> automatically don't work particularly well in the presence of self joins.
>> However, you can get it work if you use the $"column" syntax
>> consistently:
>>
>> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = df.filter('value >= 10).as("lv")
>> ​
>> smallValues
>>   .join(largeValues, $"sv.key" === $"lv.key")
>>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), $"lv.value".as("large_value"))
>>   .withColumn("diff", $"small_value" - $"large_value")
>>   .show()
>> +---+-----------+-----------+----+|key|small_value|large_value|diff|+---+-----------+-----------+----+|  1|          1|         10|  -9||  3|          5|         20| -15|+---+-----------+-----------+----+
>>
>>
>> The problem with the other cases is that calling
>> smallValues("columnName") or largeValues("columnName") is eagerly
>> resolving the attribute to the same column (since the data is actually
>> coming from the same place).  By the time we realize that you are joining
>> the data with itself (at which point we rewrite one side of the join to use
>> different expression ids) its too late.  At the core the problem is that in
>> Scala we have no easy way to differentiate largeValues("columnName")
>> from smallValues("columnName").  This is because the data is coming from
>> the same DataFrame and we don't actually know which variable name you are
>> using.  There are things we can change here, but its pretty hard to change
>> the semantics without breaking other use cases.
>>
>> So, this isn't a straight forward "bug", but its definitely a usability
>> issue.  For now, my advice would be: only use unresolved columns (i.e.
>> $"[alias.]column" or col("[alias.]column")) when working with self joins.
>>
>> Michael
>>
>
>

Re: How to distinguish columns when joining DataFrames with shared parent?

Posted by Isabelle Phan <nl...@gmail.com>.
Thanks Michael and Ali for the reply!

I'll make sure to use unresolved columns when working with self joins then.

As pointed by Ali, isn't there still an issue with the aliasing? It works
when using org.apache.spark.sql.functions.col(colName: String) method, but
not when using org.apache.spark.sql.DataFrame.apply(colName: String):

scala> j.select(col("lv.value")).show
+-----+
|value|
+-----+
|   10|
|   20|
+-----+


scala> j.select(largeValues("lv.value")).show
+-----+
|value|
+-----+
|    1|
|    5|
+-----+

Or does this behavior have the same root cause as detailed in Michael's
email?


-Isabelle




On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> Unfortunately, the mechanisms that we use to differentiate columns
> automatically don't work particularly well in the presence of self joins.
> However, you can get it work if you use the $"column" syntax consistently:
>
> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = df.filter('value >= 10).as("lv")
> ​
> smallValues
>   .join(largeValues, $"sv.key" === $"lv.key")
>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), $"lv.value".as("large_value"))
>   .withColumn("diff", $"small_value" - $"large_value")
>   .show()
> +---+-----------+-----------+----+|key|small_value|large_value|diff|+---+-----------+-----------+----+|  1|          1|         10|  -9||  3|          5|         20| -15|+---+-----------+-----------+----+
>
>
> The problem with the other cases is that calling smallValues("columnName")
> or largeValues("columnName") is eagerly resolving the attribute to the
> same column (since the data is actually coming from the same place).  By
> the time we realize that you are joining the data with itself (at which
> point we rewrite one side of the join to use different expression ids) its
> too late.  At the core the problem is that in Scala we have no easy way to
> differentiate largeValues("columnName") from smallValues("columnName").
> This is because the data is coming from the same DataFrame and we don't
> actually know which variable name you are using.  There are things we can
> change here, but its pretty hard to change the semantics without breaking
> other use cases.
>
> So, this isn't a straight forward "bug", but its definitely a usability
> issue.  For now, my advice would be: only use unresolved columns (i.e.
> $"[alias.]column" or col("[alias.]column")) when working with self joins.
>
> Michael
>

Re: How to distinguish columns when joining DataFrames with shared parent?

Posted by Michael Armbrust <mi...@databricks.com>.
Unfortunately, the mechanisms that we use to differentiate columns
automatically don't work particularly well in the presence of self joins.
However, you can get it work if you use the $"column" syntax consistently:

val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4,
10)).toDF("key", "value")val smallValues = df.filter('value <
10).as("sv")val largeValues = df.filter('value >= 10).as("lv")
​
smallValues
  .join(largeValues, $"sv.key" === $"lv.key")
  .select($"sv.key".as("key"), $"sv.value".as("small_value"),
$"lv.value".as("large_value"))
  .withColumn("diff", $"small_value" - $"large_value")
  .show()
+---+-----------+-----------+----+|key|small_value|large_value|diff|+---+-----------+-----------+----+|
 1|          1|         10|  -9||  3|          5|         20|
-15|+---+-----------+-----------+----+


The problem with the other cases is that calling smallValues("columnName")
or largeValues("columnName") is eagerly resolving the attribute to the same
column (since the data is actually coming from the same place).  By the
time we realize that you are joining the data with itself (at which point
we rewrite one side of the join to use different expression ids) its too
late.  At the core the problem is that in Scala we have no easy way to
differentiate largeValues("columnName") from smallValues("columnName").
This is because the data is coming from the same DataFrame and we don't
actually know which variable name you are using.  There are things we can
change here, but its pretty hard to change the semantics without breaking
other use cases.

So, this isn't a straight forward "bug", but its definitely a usability
issue.  For now, my advice would be: only use unresolved columns (i.e.
$"[alias.]column" or col("[alias.]column")) when working with self joins.

Michael

Re: How to distinguish columns when joining DataFrames with shared parent?

Posted by Ali Tajeldin EDU <al...@gmail.com>.
Furthermore, even adding aliasing as suggested by the warning doesn't seem to help either.  Slight modification to example below:

> scala> val largeValues = df.filter('value >= 10).as("lv")

And just looking at the join results:
> scala> val j = smallValues
>   .join(largeValues, smallValues("key") === largeValues("key"))

scala> j.select($"value").show
This will throw an exception indicating that "value" is ambiguous (to be expected).

scala> j.select(smallValues("value")).show
This will show the left (small values) "values" column as expected.

scala> j.select(largeValues("value")).show
This will show the left (small values) "values" column (resolved to the wrong column)

scala> j.select(largeValues("lv.value")).show
This will show the left (small values) "values" column (resolved to the wrong column even though we explicitly specified the alias and used the right hand df)

scala> j.select($"lv.value").show
Produces a cannot resolve 'lv.value' exception (so the lv alias is not preserved in the join result).

Anyone know the appropriate way to use the aliases in DataFrame operations or is this a bug?
--
Ali


On Oct 20, 2015, at 5:23 PM, Isabelle Phan <nl...@gmail.com> wrote:

> Hello,
> 
> When joining 2 DataFrames which originate from the same initial DataFrame, why can't org.apache.spark.sql.DataFrame.apply(colName: String) method distinguish which column to read?
> 
> Let me illustrate this question with a simple example (ran on Spark 1.5.1):
> 
> //my initial DataFrame
> scala> df
> res39: org.apache.spark.sql.DataFrame = [key: int, value: int]
> 
> scala> df.show
> +---+-----+
> |key|value|
> +---+-----+
> |  1|    1|
> |  1|   10|
> |  2|    3|
> |  3|   20|
> |  3|    5|
> |  4|   10|
> +---+-----+
> 
> 
> //2 children DataFrames
> scala> val smallValues = df.filter('value < 10)
> smallValues: org.apache.spark.sql.DataFrame = [key: int, value: int]
> 
> scala> smallValues.show
> +---+-----+
> |key|value|
> +---+-----+
> |  1|    1|
> |  2|    3|
> |  3|    5|
> +---+-----+
> 
> 
> scala> val largeValues = df.filter('value >= 10)
> largeValues: org.apache.spark.sql.DataFrame = [key: int, value: int]
> 
> scala> largeValues.show
> +---+-----+
> |key|value|
> +---+-----+
> |  1|   10|
> |  3|   20|
> |  4|   10|
> +---+-----+
> 
> 
> //Joining the children
> scala> smallValues
>   .join(largeValues, smallValues("key") === largeValues("key"))
>   .withColumn("diff", smallValues("value") - largeValues("value"))
>   .show
> 15/10/20 16:59:59 WARN Column: Constructing trivially true equals predicate, 'key#41 = key#41'. Perhaps you need to use aliases.
> +---+-----+---+-----+----+
> |key|value|key|value|diff|
> +---+-----+---+-----+----+
> |  1|    1|  1|   10|   0|
> |  3|    5|  3|   20|   0|
> +---+-----+---+-----+----+
> 
> 
> This last command issued a warning, but still executed the join correctly (rows with key 2 and 4 don't appear in result set). However, the "diff" column is incorrect.
> 
> Is this a bug or am I missing something here?
> 
> 
> Thanks a lot for any input,
> 
> Isabelle