You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Martin Senne <ma...@googlemail.com> on 2015/08/01 13:13:21 UTC

Re: Spark SQL DataFrame: Nullable column and filtering

Dear all,

after some fiddling I have arrived at this solution:

/**
 * Customized left outer join on common column.
 */
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF:
DataFrame, commonColumnName: String): DataFrame = {
  val joinedDF = leftDF.as('left).join(rightDF.as('right),
leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")

  import joinedDF.sqlContext.implicits._
  val leftColumns = leftDF.columns
    .map((cn: String) => $"left.$cn")
  val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName))
    .map((cn: String) => $"right.$cn")

  joinedDF.select( leftColumns ++ rightColumns: _*)
}

Comments welcome!!!!

Alternatives I tried:

   - Not Working: If at least the right alias for rightDF is present, one
   could try

   joinedDF.drop("right." + columnname)

   but his does not work (no column is dropped).
   Unfortunately, drop does not support arguments of type Column /
   ColumnNames. *@Michael: Should I create a feature request in Jira for
   drop supporting Columns?*

   -

   Working: Without using aliases via as(...), but using column
renaming instead:

   rightDF.withColumnRenamed( communColumnName, "right_" +
commoncolumnName) to rename the right dataframe column and then do the
join criterion as
   leftDF(commonColumnName) === rightDF("right_" + commonColumnName)

   In my opinion not so neat. Opinions?


Things I observed:

   - Column handling does not seem consistent
      - select(....) supports alias, while drop( ... ) only supports
      strings.
      - DataFrame.apply( .... ) and DataFrame.col do also not support alias.
      - Thus the only way to handly ambiguous columnNames is via select at
      the moment. Can someone please confirm this!
      - Alias information is not displayed via DataFrame.printSchema. (or
   at least I did not find a way of how to)

Cheers,

Martin

2015-07-31 22:51 GMT+02:00 Martin Senne <ma...@googlemail.com>:

> Dear Michael, dear all,
>
> a minimal example is listed below.
>
> After some further analysis I could figure out, that the problem is
> related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use
> columns of the left and right dataframes when doing the select on the
> joined table.
>
>   /**
>    * Customized left outer join on common column.
>    */
>   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = {
>
>     val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
>     val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
>     leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")
>       .select(leftColumns ++ rightColumns: _*)
>   }
>
> As the column "y" of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( "y" ).
>
> Thus, I need to use columns of the joined table for the select.
>
> *Question now: The joined table has column names "x", "a", "x", "y". How do I discard the second x column?*
>
> All my approaches failed (assuming here, that joinedDF is the joined DataFrame.
>
>
>    - Using joinedDFdrop( "x" ) discards both "x" columns.
>    - Using joinedDF("x") does not work as it is ambigious
>    - Also using rightDF.as( "aliasname")  in order to differentiate the
>    column "x" (from left DataFrame) with "x" (from right DataFrame) did not
>    work out, as I found no way as use select( $"aliasname.x") really
>    programmatically. Could someone sketch the code?
>
> Any help welcome, thanks
>
>
> Martin
>
>
>
> ========================================
> import org.apache.spark.sql.types.{StructField, StructType}
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.{DataFrame, SQLContext}
>
> object OtherEntities {
>
>   case class Record( x:Int, a: String)
>   case class Mapping( x: Int, y: Int )
>
>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
>   val mappings = Seq( Mapping(2, 5) )
> }
>
> object MinimalShowcase {
>
>   /**
>    * Customized left outer join on common column.
>    */
>   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = {
>
>     val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
>     val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
>     leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")
>       .select(leftColumns ++ rightColumns: _*)
>   }
>
>
>   /**
>    * Set, if a column is nullable.
>    * @param df source DataFrame
>    * @param cn is the column name to change
>    * @param nullable is the flag to set, such that the column is either nullable or not
>    */
>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
>
>     val schema = df.schema
>     val newSchema = StructType(schema.map {
>       case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
>       case y: StructField => y
>     })
>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>   }
>
>
>   def main (args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("Minimal")
>       .setMaster("local[*]")
>
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     // used to implicitly convert an RDD to a DataFrame.
>     import sqlContext.implicits._
>
>     val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
>     val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
>     val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true)
>
>     val joinedDF = recordDF.join(mappingDF, recordDF("x") === mappingDF("x"), "leftouter")
>     println("joinedDF:")
>     joinedDF.show
>     joinedDF.printSchema
>     joinedDF.filter(joinedDF("y").isNotNull).show
>
> //    joinedDF:
> //    +-+-----+----+----+
> //    |x|    a|   x|   y|
> //    +-+-----+----+----+
> //    |1|hello|null|null|
> //    |2|  bob|   2|   5|
> //    +-+-----+----+----+
> //
> //    root
> //    |-- x: integer (nullable = false)
> //    |-- a: string (nullable = true)
> //    |-- x: integer (nullable = true)
> //    |-- y: integer (nullable = true)
> //
> //    +-+---+-+-+
> //    |x|  a|x|y|
> //    +-+---+-+-+
> //    |2|bob|2|5|
> //    +-+---+-+-+
>
>
>     val extrajoinedDF = leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, "x")
>     println("extrajoinedDF:")
>     extrajoinedDF.show
>     extrajoinedDF.printSchema
>     extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show
>
> //    extrajoinedDF:
> //    +-+-----+----+
> //    |x|    a|   y|
> //    +-+-----+----+
> //    |1|hello|null|
> //    |2|  bob|   5|
> //    +-+-----+----+
> //
> //    root
> //    |-- x: integer (nullable = false)
> //    |-- a: string (nullable = true)
> //    |-- y: integer (nullable = false)
> //
> //    +-+-----+----+
> //    |x|    a|   y|
> //    +-+-----+----+
> //    |1|hello|null|
> //    |2|  bob|   5|
> //    +-+-----+----+
>
>
>
>     val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") === mappingWithNullDF("x"), "leftouter")
>     println("joined2DF:")
>     joined2DF.show
>     joined2DF.printSchema
>     joined2DF.filter(joined2DF("y").isNotNull).show
>
> //    joined2DF:
> //    +-+-----+----+----+
> //    |x|    a|   x|   y|
> //    +-+-----+----+----+
> //    |1|hello|null|null|
> //    |2|  bob|   2|   5|
> //    +-+-----+----+----+
> //
> //    root
> //    |-- x: integer (nullable = false)
> //    |-- a: string (nullable = true)
> //    |-- x: integer (nullable = true)
> //    |-- y: integer (nullable = true)
> //
> //    +-+---+-+-+
> //    |x|  a|x|y|
> //    +-+---+-+-+
> //    |2|bob|2|5|
> //    +-+---+-+-+
>
>   }
> }
>
>
>
> 2015-07-31 1:56 GMT+02:00 Martin Senne <ma...@googlemail.com>:
>
>> Dear Michael, dear all,
>>
>> distinguishing those records that have a match in mapping from those that
>> don't is the crucial point.
>>
>> Record(x : Int,  a: String)
>> Mapping(x: Int, y: Int)
>>
>> Thus
>>
>> Record(1, "hello")
>> Record(2, "bob")
>> Mapping(2, 5)
>>
>> yield (2, "bob", 5) on an inner join.
>> BUT I'm also interested in (1, "hello", null) as there is no counterpart
>> in mapping (this is the left outer join part)
>>
>> I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
>> updates (case 2, bon).
>>
>> Cheers and thanks,
>>
>> Martin
>>
>> Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mi...@databricks.com>:
>> >
>> > Perhaps I'm missing what you are trying to accomplish, but if you'd
>> like to avoid the null values do an inner join instead of an outer join.
>> >
>> > Additionally, I'm confused about how the result
>> of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values
>> in the column y. This doesn't really have anything to do with nullable,
>> which is only a hint to the system so that we can avoid null checking when
>> we know that there are no null values. If you provide the full code i can
>> try and see if this is a bug.
>> >
>> > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <
>> martin.senne@googlemail.com> wrote:
>> >>
>> >> Dear Michael, dear all,
>> >>
>> >> motivation:
>> >>
>> >> object OtherEntities {
>> >>
>> >>   case class Record( x:Int, a: String)
>> >>   case class Mapping( x: Int, y: Int )
>> >>
>> >>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
>> >>   val mappings = Seq( Mapping(2, 5) )
>> >> }
>> >>
>> >> Now I want to perform an left outer join on records and mappings (with
>> the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") ....
>> shorthand is in leftOuterJoinWithRemovalOfEqualColumn
>> >>
>> >> val sqlContext = new SQLContext(sc)
>> >> // used to implicitly convert an RDD to a DataFrame.
>> >> import sqlContext.implicits._
>> >>
>> >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
>> >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
>> >>
>> >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
>> mappingDF, "x")
>> >>
>> >> joinedDF.filter(joinedDF("y").isNotNull).show
>> >>
>> >>
>> >> Currently, the output is
>> >>
>> >>
>> +-+-----+----+
>>
>> >> |x|    a|   y|
>> >> +-+-----+----+
>> >> |1|hello|null|
>> >> |2|  bob|   5|
>> >> +-+-----+----+
>> >>
>> >> instead of
>> >>
>> >>
>> +-+---+-+
>>
>> >> |x|  a|y|
>> >> +-+---+-+
>> >> |2|bob|5|
>> >> +-+---+-+
>> >>
>> >> The last output can be achieved by the method of changing
>> nullable=false to nullable=true described in my first post.
>> >>
>> >> Thus, I need this schema modification as to make outer joins work.
>> >>
>> >> Cheers and thanks,
>> >>
>> >> Martin
>> >>
>> >>
>> >>
>> >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>> >>>
>> >>> We don't yet updated nullability information based on predicates as
>> we don't actually leverage this information in many places yet.  Why do you
>> want to update the schema?
>> >>>
>> >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
>> martin.senne@googlemail.com> wrote:
>> >>>>
>> >>>> Hi all,
>> >>>>
>> >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
>> >>>> nullable column of Doubles, I can use the following Scala code to
>> filter all
>> >>>> "non-null" rows:*
>> >>>>
>> >>>>   val df = ..... // some code that creates a DataFrame
>> >>>>   df.filter( df("columnname").isNotNull() )
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x|    a|   y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2|  bob|   5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> root
>> >>>>  |-- x: integer (nullable = false)
>> >>>>  |-- a: string (nullable = true)
>> >>>>  |-- y: integer (nullable = true)
>> >>>>
>> >>>> And with the filter expression
>> >>>> +-+---+-+
>> >>>> |x|  a|y|
>> >>>> +-+---+-+
>> >>>> |2|bob|5|
>> >>>> +-+---+-+
>> >>>>
>> >>>>
>> >>>> Unfortunetaly and while this is a true for a nullable column
>> (according to
>> >>>> df.printSchema), it is not true for a column that is not nullable:
>> >>>>
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x|    a|   y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2|  bob|   5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> root
>> >>>>  |-- x: integer (nullable = false)
>> >>>>  |-- a: string (nullable = true)
>> >>>>  |-- y: integer (nullable = false)
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x|    a|   y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2|  bob|   5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> such that the output is not affected by the filter. Is this intended?
>> >>>>
>> >>>>
>> >>>> 2. *What is the cheapest (in sense of performance) to turn a
>> non-nullable
>> >>>> column into a nullable column?
>> >>>> A came uo with this:*
>> >>>>
>> >>>>   /**
>> >>>>    * Set, if a column is nullable.
>> >>>>    * @param df source DataFrame
>> >>>>    * @param cn is the column name to change
>> >>>>    * @param nullable is the flag to set, such that the column is
>> either
>> >>>> nullable or not
>> >>>>    */
>> >>>>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
>> >>>> Boolean) : DataFrame = {
>> >>>>
>> >>>>     val schema = df.schema
>> >>>>     val newSchema = StructType(schema.map {
>> >>>>       case StructField( c, t, _, m) if c.equals(cn) => StructField(
>> c, t,
>> >>>> nullable = nullable, m)
>> >>>>       case y: StructField => y
>> >>>>     })
>> >>>>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>> >>>>   }
>> >>>>
>> >>>> Is there a cheaper solution?
>> >>>>
>> >>>> 3. *Any comments?*
>> >>>>
>> >>>> Cheers and thx in advance,
>> >>>>
>> >>>> Martin
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
>> >>>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>
>> >>>
>> >>
>> >
>>
>>
>