You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alexandros Biratsis <al...@gmail.com> on 2020/06/09 10:59:47 UTC

Re: [SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

Hi Enrico and Spark devs,

Since the current plan is not to provide a built-in functionality for
dropping repeated/redundant columns, I wrote two helper methods as a
workaround solution.

The 1st method supports multiple Column instances extending the current drop
<https://github.com/apache/spark/blob/ddd8d5f5a0b6db17babc201ba4b73f7df91df1a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2477>
which
supports column names only:

implicit class DataframeExt(val df: DataFrame) {
  def drop(cols: Seq[Column]) : DataFrame = {
    cols.foldLeft(df){
      (tdf, c) => tdf.drop(c)
    }
  }
}

2nd implicit method which converts a sequence of column names into Column
instances, optionally binding them to the parent dataframes:

implicit class SeqExt(val cols: Seq[String]) {
  def toCol(dfs: DataFrame*) : Seq[Column] = {
    if(dfs.nonEmpty) {
      dfs.foldLeft(Seq[Column]()) {
        (acc, df) => acc ++ cols.map {df(_)}
      }
    }
    else{
      cols.map {col(_)}
    }
  }
}

After adding these two to your library you can use it as:

import implicits._

val dropCols = Seq("c2", "c3")
val joinCols = Seq("c1")

val weatherDf = dfA.join(dfB, joinCols, "inner")
                                 .join(dfC, joinCols, "inner")
                                 .join(dfD, joinCols, "inner")
                                 .drop(dropCols.toCol(dfB, dfC, dfD))

Cheers,
Alex

On Wed, Feb 26, 2020 at 10:07 AM Enrico Minack <ma...@enrico.minack.dev>
wrote:

> I have created a jira to track this request:
> https://issues.apache.org/jira/browse/SPARK-30957
>
> Enrico
>
> Am 08.02.20 um 16:56 schrieb Enrico Minack:
>
> Hi Devs,
>
> I am forwarding this from the user mailing list. I agree that the <=>
> version of join(Dataset[_], Seq[String]) would be useful.
>
> Does any PMC consider this useful enough to be added to the Dataset API?
> I'd be happy to create a PR in that case.
>
> Enrico
>
>
> -------- Weitergeleitete Nachricht --------
> Betreff: dataframe null safe joins given a list of columns
> Datum: Thu, 6 Feb 2020 12:45:11 +0000
> Von: Marcelo Valle <ma...@ktech.com> <ma...@ktech.com>
> An: user @spark <us...@spark.apache.org> <us...@spark.apache.org>
>
> I was surprised I couldn't find a way of solving this in spark, as it must
> be a very common problem for users. Then I decided to ask here.
>
> Consider the code bellow:
>
> ```
> val joinColumns = Seq("a", "b")
> val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null,
> "c4")).toDF("a", "b", "c")
> val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null,
> "d4")).toDF("a", "b", "d")
> df1.join(df2, joinColumns).show()
> ```
>
> The output is :
>
> ```
> +---+---+---+---+
> |  a|  b|  c|  d|
> +---+---+---+---+
> | a1| b1| c1| d1|
> +---+---+---+---+
> ```
>
> But I want it to be:
>
> ```
> +---+-----+---+---+
> |  a|    b|  c|  d|
> +---+-----+---+---+
> | a1|   b1| c1| d1|
> | a4| null| c4| d4|
> +---+-----+---+---+
> ```
>
> The join syntax of `df1.join(df2, joinColumns)` has some advantages, as it
> doesn't create duplicate columns by default. However, it uses the operator
> `===` to join, not the null safe one `<=>`.
>
> Using the following syntax:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
> ```
>
> Would produce:
>
> ```
> +---+----+---+---+----+---+
> |  a|   b|  c|  a|   b|  d|
> +---+----+---+---+----+---+
> | a1|  b1| c1| a1|  b1| d1|
> | a4|null| c4| a4|null| d4|
> +---+----+---+---+----+---+
> ```
>
> So to get the result I really want, I must do:
>
> ```
> df1.join(df2, df1("a") <=> df2("a") && df1("b") <=>
> df2("b")).drop(df2("a")).drop(df2("b")).show()
> +---+----+---+---+
> |  a|   b|  c|  d|
> +---+----+---+---+
> | a1|  b1| c1| d1|
> | a4|null| c4| d4|
> +---+----+---+---+
> ```
>
> Which works, but is really verbose, especially when you have many join
> columns.
>
> Is there a better way of solving this without needing a utility method?
> This same problem is something I find in every spark project.
>
>
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>
>
>