You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Enrico Minack <ma...@Enrico.Minack.dev> on 2020/02/08 15:56:20 UTC

Fwd: dataframe null safe joins given a list of columns

Hi Devs,

I am forwarding this from the user mailing list. I agree that the <=> 
version of join(Dataset[_], Seq[String]) would be useful.

Does any PMC consider this useful enough to be added to the Dataset API? 
I'd be happy to create a PR in that case.

Enrico



-------- Weitergeleitete Nachricht --------
Betreff: 	dataframe null safe joins given a list of columns
Datum: 	Thu, 6 Feb 2020 12:45:11 +0000
Von: 	Marcelo Valle <ma...@ktech.com>
An: 	user @spark <us...@spark.apache.org>



I was surprised I couldn't find a way of solving this in spark, as it 
must be a very common problem for users. Then I decided to ask here.

Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, 
"c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, 
"d4")).toDF("a", "b", "d")
df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-----+---+---+
|  a|    b|  c|  d|
+---+-----+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-----+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, as 
it doesn't create duplicate columns by default. However, it uses the 
operator `===` to join, not the null safe one `<=>`.

Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---+----+---+---+----+---+
|  a|   b|  c|  a|   b|  d|
+---+----+---+---+----+---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---+----+---+---+----+---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> 
df2("b")).drop(df2("a")).drop(df2("b")).show()
+---+----+---+---+
|  a|   b|  c|  d|
+---+----+---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---+----+---+---+
```

Which works, but is really verbose, especially when you have many join 
columns.

Is there a better way of solving this without needing a utility method? 
This same problem is something I find in every spark project.



This email is confidential [and may be protected by legal privilege]. If 
you are not the intended recipient, please do not copy or disclose its 
content but contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
United Kingdom


Re: [SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

Posted by Alexandros Biratsis <al...@gmail.com>.
Hi Enrico and Spark devs,

Since the current plan is not to provide a built-in functionality for
dropping repeated/redundant columns, I wrote two helper methods as a
workaround solution.

The 1st method supports multiple Column instances extending the current drop
<https://github.com/apache/spark/blob/ddd8d5f5a0b6db17babc201ba4b73f7df91df1a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2477>
which
supports column names only:

implicit class DataframeExt(val df: DataFrame) {
  def drop(cols: Seq[Column]) : DataFrame = {
    cols.foldLeft(df){
      (tdf, c) => tdf.drop(c)
    }
  }
}

2nd implicit method which converts a sequence of column names into Column
instances, optionally binding them to the parent dataframes:

implicit class SeqExt(val cols: Seq[String]) {
  def toCol(dfs: DataFrame*) : Seq[Column] = {
    if(dfs.nonEmpty) {
      dfs.foldLeft(Seq[Column]()) {
        (acc, df) => acc ++ cols.map {df(_)}
      }
    }
    else{
      cols.map {col(_)}
    }
  }
}

After adding these two to your library you can use it as:

import implicits._

val dropCols = Seq("c2", "c3")
val joinCols = Seq("c1")

val weatherDf = dfA.join(dfB, joinCols, "inner")
                                 .join(dfC, joinCols, "inner")
                                 .join(dfD, joinCols, "inner")
                                 .drop(dropCols.toCol(dfB, dfC, dfD))

Cheers,
Alex

On Wed, Feb 26, 2020 at 10:07 AM Enrico Minack <ma...@enrico.minack.dev>
wrote:

> I have created a jira to track this request:
> https://issues.apache.org/jira/browse/SPARK-30957
>
> Enrico
>
> Am 08.02.20 um 16:56 schrieb Enrico Minack:
>
> Hi Devs,
>
> I am forwarding this from the user mailing list. I agree that the <=>
> version of join(Dataset[_], Seq[String]) would be useful.
>
> Does any PMC consider this useful enough to be added to the Dataset API?
> I'd be happy to create a PR in that case.
>
> Enrico
>
>
> -------- Weitergeleitete Nachricht --------
> Betreff: dataframe null safe joins given a list of columns
> Datum: Thu, 6 Feb 2020 12:45:11 +0000
> Von: Marcelo Valle <ma...@ktech.com> <ma...@ktech.com>
> An: user @spark <us...@spark.apache.org> <us...@spark.apache.org>
>
> I was surprised I couldn't find a way of solving this in spark, as it must
> be a very common problem for users. Then I decided to ask here.
>
> Consider the code bellow:
>
> ```
> val joinColumns = Seq("a", "b")
> val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null,
> "c4")).toDF("a", "b", "c")
> val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null,
> "d4")).toDF("a", "b", "d")
> df1.join(df2, joinColumns).show()
> ```
>
> The output is :
>
> ```
> +---+---+---+---+
> |  a|  b|  c|  d|
> +---+---+---+---+
> | a1| b1| c1| d1|
> +---+---+---+---+
> ```
>
> But I want it to be:
>
> ```
> +---+-----+---+---+
> |  a|    b|  c|  d|
> +---+-----+---+---+
> | a1|   b1| c1| d1|
> | a4| null| c4| d4|
> +---+-----+---+---+
> ```
>
> The join syntax of `df1.join(df2, joinColumns)` has some advantages, as it
> doesn't create duplicate columns by default. However, it uses the operator
> `===` to join, not the null safe one `<=>`.
>
> Using the following syntax:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
> ```
>
> Would produce:
>
> ```
> +---+----+---+---+----+---+
> |  a|   b|  c|  a|   b|  d|
> +---+----+---+---+----+---+
> | a1|  b1| c1| a1|  b1| d1|
> | a4|null| c4| a4|null| d4|
> +---+----+---+---+----+---+
> ```
>
> So to get the result I really want, I must do:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=>
> df2("b")).drop(df2("a")).drop(df2("b")).show()
> +---+----+---+---+
> |  a|   b|  c|  d|
> +---+----+---+---+
> | a1|  b1| c1| d1|
> | a4|null| c4| d4|
> +---+----+---+---+
> ```
>
> Which works, but is really verbose, especially when you have many join
> columns.
>
> Is there a better way of solving this without needing a utility method?
> This same problem is something I find in every spark project.
>
>
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>
>
>

Re: [SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

Posted by Alexandros Biratsis <al...@gmail.com>.
Hi Enrico and Spark devs,

Since the current plan is not to provide a built-in functionality for
dropping repeated/redundant columns, I wrote two helper methods as a
workaround solution.

The 1st method supports multiple Column instances extending the current drop
<https://github.com/apache/spark/blob/ddd8d5f5a0b6db17babc201ba4b73f7df91df1a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2477>
which
supports column names only:

implicit class DataframeExt(val df: DataFrame) {
  def drop(cols: Seq[Column]) : DataFrame = {
    cols.foldLeft(df){
      (tdf, c) => tdf.drop(c)
    }
  }
}

2nd implicit method which converts a sequence of column names into Column
instances, optionally binding them to the parent dataframes:

implicit class SeqExt(val cols: Seq[String]) {
  def toCol(dfs: DataFrame*) : Seq[Column] = {
    if(dfs.nonEmpty) {
      dfs.foldLeft(Seq[Column]()) {
        (acc, df) => acc ++ cols.map {df(_)}
      }
    }
    else{
      cols.map {col(_)}
    }
  }
}

After adding these two to your library you can use it as:

import implicits._

val dropCols = Seq("c2", "c3")
val joinCols = Seq("c1")

val weatherDf = dfA.join(dfB, joinCols, "inner")
                                 .join(dfC, joinCols, "inner")
                                 .join(dfD, joinCols, "inner")
                                 .drop(dropCols.toCol(dfB, dfC, dfD))

Cheers,
Alex

On Wed, Feb 26, 2020 at 10:07 AM Enrico Minack <ma...@enrico.minack.dev>
wrote:

> I have created a jira to track this request:
> https://issues.apache.org/jira/browse/SPARK-30957
>
> Enrico
>
> Am 08.02.20 um 16:56 schrieb Enrico Minack:
>
> Hi Devs,
>
> I am forwarding this from the user mailing list. I agree that the <=>
> version of join(Dataset[_], Seq[String]) would be useful.
>
> Does any PMC consider this useful enough to be added to the Dataset API?
> I'd be happy to create a PR in that case.
>
> Enrico
>
>
> -------- Weitergeleitete Nachricht --------
> Betreff: dataframe null safe joins given a list of columns
> Datum: Thu, 6 Feb 2020 12:45:11 +0000
> Von: Marcelo Valle <ma...@ktech.com> <ma...@ktech.com>
> An: user @spark <us...@spark.apache.org> <us...@spark.apache.org>
>
> I was surprised I couldn't find a way of solving this in spark, as it must
> be a very common problem for users. Then I decided to ask here.
>
> Consider the code bellow:
>
> ```
> val joinColumns = Seq("a", "b")
> val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null,
> "c4")).toDF("a", "b", "c")
> val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null,
> "d4")).toDF("a", "b", "d")
> df1.join(df2, joinColumns).show()
> ```
>
> The output is :
>
> ```
> +---+---+---+---+
> |  a|  b|  c|  d|
> +---+---+---+---+
> | a1| b1| c1| d1|
> +---+---+---+---+
> ```
>
> But I want it to be:
>
> ```
> +---+-----+---+---+
> |  a|    b|  c|  d|
> +---+-----+---+---+
> | a1|   b1| c1| d1|
> | a4| null| c4| d4|
> +---+-----+---+---+
> ```
>
> The join syntax of `df1.join(df2, joinColumns)` has some advantages, as it
> doesn't create duplicate columns by default. However, it uses the operator
> `===` to join, not the null safe one `<=>`.
>
> Using the following syntax:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
> ```
>
> Would produce:
>
> ```
> +---+----+---+---+----+---+
> |  a|   b|  c|  a|   b|  d|
> +---+----+---+---+----+---+
> | a1|  b1| c1| a1|  b1| d1|
> | a4|null| c4| a4|null| d4|
> +---+----+---+---+----+---+
> ```
>
> So to get the result I really want, I must do:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=>
> df2("b")).drop(df2("a")).drop(df2("b")).show()
> +---+----+---+---+
> |  a|   b|  c|  d|
> +---+----+---+---+
> | a1|  b1| c1| d1|
> | a4|null| c4| d4|
> +---+----+---+---+
> ```
>
> Which works, but is really verbose, especially when you have many join
> columns.
>
> Is there a better way of solving this without needing a utility method?
> This same problem is something I find in every spark project.
>
>
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>
>
>

[SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

Posted by Enrico Minack <ma...@Enrico.Minack.dev>.
I have created a jira to track this request: 
https://issues.apache.org/jira/browse/SPARK-30957

Enrico

Am 08.02.20 um 16:56 schrieb Enrico Minack:
>
> Hi Devs,
>
> I am forwarding this from the user mailing list. I agree that the <=> 
> version of join(Dataset[_], Seq[String]) would be useful.
>
> Does any PMC consider this useful enough to be added to the Dataset 
> API? I'd be happy to create a PR in that case.
>
> Enrico
>
>
>
> -------- Weitergeleitete Nachricht --------
> Betreff: 	dataframe null safe joins given a list of columns
> Datum: 	Thu, 6 Feb 2020 12:45:11 +0000
> Von: 	Marcelo Valle <ma...@ktech.com>
> An: 	user @spark <us...@spark.apache.org>
>
>
>
> I was surprised I couldn't find a way of solving this in spark, as it 
> must be a very common problem for users. Then I decided to ask here.
>
> Consider the code bellow:
>
> ```
> val joinColumns = Seq("a", "b")
> val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, 
> "c4")).toDF("a", "b", "c")
> val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, 
> "d4")).toDF("a", "b", "d")
> df1.join(df2, joinColumns).show()
> ```
>
> The output is :
>
> ```
> +---+---+---+---+
> |  a|  b|  c|  d|
> +---+---+---+---+
> | a1| b1| c1| d1|
> +---+---+---+---+
> ```
>
> But I want it to be:
>
> ```
> +---+-----+---+---+
> |  a|    b|  c|  d|
> +---+-----+---+---+
> | a1|   b1| c1| d1|
> | a4| null| c4| d4|
> +---+-----+---+---+
> ```
>
> The join syntax of `df1.join(df2, joinColumns)` has some advantages, 
> as it doesn't create duplicate columns by default. However, it uses 
> the operator `===` to join, not the null safe one `<=>`.
>
> Using the following syntax:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
> ```
>
> Would produce:
>
> ```
> +---+----+---+---+----+---+
> |  a|   b|  c|  a|   b|  d|
> +---+----+---+---+----+---+
> | a1|  b1| c1| a1|  b1| d1|
> | a4|null| c4| a4|null| d4|
> +---+----+---+---+----+---+
> ```
>
> So to get the result I really want, I must do:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> 
> df2("b")).drop(df2("a")).drop(df2("b")).show()
> +---+----+---+---+
> |  a|   b|  c|  d|
> +---+----+---+---+
> | a1|  b1| c1| d1|
> | a4|null| c4| d4|
> +---+----+---+---+
> ```
>
> Which works, but is really verbose, especially when you have many join 
> columns.
>
> Is there a better way of solving this without needing a 
> utility method? This same problem is something I find in every spark 
> project.
>
>
>
> This email is confidential [and may be protected by legal privilege]. 
> If you are not the intended recipient, please do not copy or disclose 
> its content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
> United Kingdom
>


[SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

Posted by Enrico Minack <ma...@Enrico.Minack.dev>.
I have created a jira to track this request: 
https://issues.apache.org/jira/browse/SPARK-30957

Enrico

Am 08.02.20 um 16:56 schrieb Enrico Minack:
>
> Hi Devs,
>
> I am forwarding this from the user mailing list. I agree that the <=> 
> version of join(Dataset[_], Seq[String]) would be useful.
>
> Does any PMC consider this useful enough to be added to the Dataset 
> API? I'd be happy to create a PR in that case.
>
> Enrico
>
>
>
> -------- Weitergeleitete Nachricht --------
> Betreff: 	dataframe null safe joins given a list of columns
> Datum: 	Thu, 6 Feb 2020 12:45:11 +0000
> Von: 	Marcelo Valle <ma...@ktech.com>
> An: 	user @spark <us...@spark.apache.org>
>
>
>
> I was surprised I couldn't find a way of solving this in spark, as it 
> must be a very common problem for users. Then I decided to ask here.
>
> Consider the code bellow:
>
> ```
> val joinColumns = Seq("a", "b")
> val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, 
> "c4")).toDF("a", "b", "c")
> val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, 
> "d4")).toDF("a", "b", "d")
> df1.join(df2, joinColumns).show()
> ```
>
> The output is :
>
> ```
> +---+---+---+---+
> |  a|  b|  c|  d|
> +---+---+---+---+
> | a1| b1| c1| d1|
> +---+---+---+---+
> ```
>
> But I want it to be:
>
> ```
> +---+-----+---+---+
> |  a|    b|  c|  d|
> +---+-----+---+---+
> | a1|   b1| c1| d1|
> | a4| null| c4| d4|
> +---+-----+---+---+
> ```
>
> The join syntax of `df1.join(df2, joinColumns)` has some advantages, 
> as it doesn't create duplicate columns by default. However, it uses 
> the operator `===` to join, not the null safe one `<=>`.
>
> Using the following syntax:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
> ```
>
> Would produce:
>
> ```
> +---+----+---+---+----+---+
> |  a|   b|  c|  a|   b|  d|
> +---+----+---+---+----+---+
> | a1|  b1| c1| a1|  b1| d1|
> | a4|null| c4| a4|null| d4|
> +---+----+---+---+----+---+
> ```
>
> So to get the result I really want, I must do:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> 
> df2("b")).drop(df2("a")).drop(df2("b")).show()
> +---+----+---+---+
> |  a|   b|  c|  d|
> +---+----+---+---+
> | a1|  b1| c1| d1|
> | a4|null| c4| d4|
> +---+----+---+---+
> ```
>
> Which works, but is really verbose, especially when you have many join 
> columns.
>
> Is there a better way of solving this without needing a 
> utility method? This same problem is something I find in every spark 
> project.
>
>
>
> This email is confidential [and may be protected by legal privilege]. 
> If you are not the intended recipient, please do not copy or disclose 
> its content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
> United Kingdom
>