You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Martin Senne <ma...@googlemail.com> on 2015/08/01 13:13:21 UTC
Re: Spark SQL DataFrame: Nullable column and filtering
Dear all,
after some fiddling I have arrived at this solution:
/**
* Customized left outer join on common column.
*/
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF:
DataFrame, commonColumnName: String): DataFrame = {
val joinedDF = leftDF.as('left).join(rightDF.as('right),
leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")
import joinedDF.sqlContext.implicits._
val leftColumns = leftDF.columns
.map((cn: String) => $"left.$cn")
val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName))
.map((cn: String) => $"right.$cn")
joinedDF.select( leftColumns ++ rightColumns: _*)
}
Comments welcome!!!!
Alternatives I tried:
- Not Working: If at least the right alias for rightDF is present, one
could try
joinedDF.drop("right." + columnname)
but his does not work (no column is dropped).
Unfortunately, drop does not support arguments of type Column /
ColumnNames. *@Michael: Should I create a feature request in Jira for
drop supporting Columns?*
-
Working: Without using aliases via as(...), but using column
renaming instead:
rightDF.withColumnRenamed( communColumnName, "right_" +
commoncolumnName) to rename the right dataframe column and then do the
join criterion as
leftDF(commonColumnName) === rightDF("right_" + commonColumnName)
In my opinion not so neat. Opinions?
Things I observed:
- Column handling does not seem consistent
- select(....) supports alias, while drop( ... ) only supports
strings.
- DataFrame.apply( .... ) and DataFrame.col do also not support alias.
- Thus the only way to handly ambiguous columnNames is via select at
the moment. Can someone please confirm this!
- Alias information is not displayed via DataFrame.printSchema. (or
at least I did not find a way of how to)
Cheers,
Martin
2015-07-31 22:51 GMT+02:00 Martin Senne <ma...@googlemail.com>:
> Dear Michael, dear all,
>
> a minimal example is listed below.
>
> After some further analysis I could figure out, that the problem is
> related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use
> columns of the left and right dataframes when doing the select on the
> joined table.
>
> /**
> * Customized left outer join on common column.
> */
> def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = {
>
> val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
> val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
> leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")
> .select(leftColumns ++ rightColumns: _*)
> }
>
> As the column "y" of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( "y" ).
>
> Thus, I need to use columns of the joined table for the select.
>
> *Question now: The joined table has column names "x", "a", "x", "y". How do I discard the second x column?*
>
> All my approaches failed (assuming here, that joinedDF is the joined DataFrame.
>
>
> - Using joinedDFdrop( "x" ) discards both "x" columns.
> - Using joinedDF("x") does not work as it is ambigious
> - Also using rightDF.as( "aliasname") in order to differentiate the
> column "x" (from left DataFrame) with "x" (from right DataFrame) did not
> work out, as I found no way as use select( $"aliasname.x") really
> programmatically. Could someone sketch the code?
>
> Any help welcome, thanks
>
>
> Martin
>
>
>
> ========================================
> import org.apache.spark.sql.types.{StructField, StructType}
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.{DataFrame, SQLContext}
>
> object OtherEntities {
>
> case class Record( x:Int, a: String)
> case class Mapping( x: Int, y: Int )
>
> val records = Seq( Record(1, "hello"), Record(2, "bob"))
> val mappings = Seq( Mapping(2, 5) )
> }
>
> object MinimalShowcase {
>
> /**
> * Customized left outer join on common column.
> */
> def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = {
>
> val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
> val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
> leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")
> .select(leftColumns ++ rightColumns: _*)
> }
>
>
> /**
> * Set, if a column is nullable.
> * @param df source DataFrame
> * @param cn is the column name to change
> * @param nullable is the flag to set, such that the column is either nullable or not
> */
> def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
>
> val schema = df.schema
> val newSchema = StructType(schema.map {
> case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
> case y: StructField => y
> })
> df.sqlContext.createDataFrame( df.rdd, newSchema)
> }
>
>
> def main (args: Array[String]) {
> val conf = new SparkConf()
> .setAppName("Minimal")
> .setMaster("local[*]")
>
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> // used to implicitly convert an RDD to a DataFrame.
> import sqlContext.implicits._
>
> val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
> val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true)
>
> val joinedDF = recordDF.join(mappingDF, recordDF("x") === mappingDF("x"), "leftouter")
> println("joinedDF:")
> joinedDF.show
> joinedDF.printSchema
> joinedDF.filter(joinedDF("y").isNotNull).show
>
> // joinedDF:
> // +-+-----+----+----+
> // |x| a| x| y|
> // +-+-----+----+----+
> // |1|hello|null|null|
> // |2| bob| 2| 5|
> // +-+-----+----+----+
> //
> // root
> // |-- x: integer (nullable = false)
> // |-- a: string (nullable = true)
> // |-- x: integer (nullable = true)
> // |-- y: integer (nullable = true)
> //
> // +-+---+-+-+
> // |x| a|x|y|
> // +-+---+-+-+
> // |2|bob|2|5|
> // +-+---+-+-+
>
>
> val extrajoinedDF = leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, "x")
> println("extrajoinedDF:")
> extrajoinedDF.show
> extrajoinedDF.printSchema
> extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show
>
> // extrajoinedDF:
> // +-+-----+----+
> // |x| a| y|
> // +-+-----+----+
> // |1|hello|null|
> // |2| bob| 5|
> // +-+-----+----+
> //
> // root
> // |-- x: integer (nullable = false)
> // |-- a: string (nullable = true)
> // |-- y: integer (nullable = false)
> //
> // +-+-----+----+
> // |x| a| y|
> // +-+-----+----+
> // |1|hello|null|
> // |2| bob| 5|
> // +-+-----+----+
>
>
>
> val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") === mappingWithNullDF("x"), "leftouter")
> println("joined2DF:")
> joined2DF.show
> joined2DF.printSchema
> joined2DF.filter(joined2DF("y").isNotNull).show
>
> // joined2DF:
> // +-+-----+----+----+
> // |x| a| x| y|
> // +-+-----+----+----+
> // |1|hello|null|null|
> // |2| bob| 2| 5|
> // +-+-----+----+----+
> //
> // root
> // |-- x: integer (nullable = false)
> // |-- a: string (nullable = true)
> // |-- x: integer (nullable = true)
> // |-- y: integer (nullable = true)
> //
> // +-+---+-+-+
> // |x| a|x|y|
> // +-+---+-+-+
> // |2|bob|2|5|
> // +-+---+-+-+
>
> }
> }
>
>
>
> 2015-07-31 1:56 GMT+02:00 Martin Senne <ma...@googlemail.com>:
>
>> Dear Michael, dear all,
>>
>> distinguishing those records that have a match in mapping from those that
>> don't is the crucial point.
>>
>> Record(x : Int, a: String)
>> Mapping(x: Int, y: Int)
>>
>> Thus
>>
>> Record(1, "hello")
>> Record(2, "bob")
>> Mapping(2, 5)
>>
>> yield (2, "bob", 5) on an inner join.
>> BUT I'm also interested in (1, "hello", null) as there is no counterpart
>> in mapping (this is the left outer join part)
>>
>> I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
>> updates (case 2, bon).
>>
>> Cheers and thanks,
>>
>> Martin
>>
>> Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mi...@databricks.com>:
>> >
>> > Perhaps I'm missing what you are trying to accomplish, but if you'd
>> like to avoid the null values do an inner join instead of an outer join.
>> >
>> > Additionally, I'm confused about how the result
>> of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values
>> in the column y. This doesn't really have anything to do with nullable,
>> which is only a hint to the system so that we can avoid null checking when
>> we know that there are no null values. If you provide the full code i can
>> try and see if this is a bug.
>> >
>> > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <
>> martin.senne@googlemail.com> wrote:
>> >>
>> >> Dear Michael, dear all,
>> >>
>> >> motivation:
>> >>
>> >> object OtherEntities {
>> >>
>> >> case class Record( x:Int, a: String)
>> >> case class Mapping( x: Int, y: Int )
>> >>
>> >> val records = Seq( Record(1, "hello"), Record(2, "bob"))
>> >> val mappings = Seq( Mapping(2, 5) )
>> >> }
>> >>
>> >> Now I want to perform an left outer join on records and mappings (with
>> the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") ....
>> shorthand is in leftOuterJoinWithRemovalOfEqualColumn
>> >>
>> >> val sqlContext = new SQLContext(sc)
>> >> // used to implicitly convert an RDD to a DataFrame.
>> >> import sqlContext.implicits._
>> >>
>> >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
>> >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
>> >>
>> >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
>> mappingDF, "x")
>> >>
>> >> joinedDF.filter(joinedDF("y").isNotNull).show
>> >>
>> >>
>> >> Currently, the output is
>> >>
>> >>
>> +-+-----+----+
>>
>> >> |x| a| y|
>> >> +-+-----+----+
>> >> |1|hello|null|
>> >> |2| bob| 5|
>> >> +-+-----+----+
>> >>
>> >> instead of
>> >>
>> >>
>> +-+---+-+
>>
>> >> |x| a|y|
>> >> +-+---+-+
>> >> |2|bob|5|
>> >> +-+---+-+
>> >>
>> >> The last output can be achieved by the method of changing
>> nullable=false to nullable=true described in my first post.
>> >>
>> >> Thus, I need this schema modification as to make outer joins work.
>> >>
>> >> Cheers and thanks,
>> >>
>> >> Martin
>> >>
>> >>
>> >>
>> >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>> >>>
>> >>> We don't yet updated nullability information based on predicates as
>> we don't actually leverage this information in many places yet. Why do you
>> want to update the schema?
>> >>>
>> >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
>> martin.senne@googlemail.com> wrote:
>> >>>>
>> >>>> Hi all,
>> >>>>
>> >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
>> >>>> nullable column of Doubles, I can use the following Scala code to
>> filter all
>> >>>> "non-null" rows:*
>> >>>>
>> >>>> val df = ..... // some code that creates a DataFrame
>> >>>> df.filter( df("columnname").isNotNull() )
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x| a| y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2| bob| 5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> root
>> >>>> |-- x: integer (nullable = false)
>> >>>> |-- a: string (nullable = true)
>> >>>> |-- y: integer (nullable = true)
>> >>>>
>> >>>> And with the filter expression
>> >>>> +-+---+-+
>> >>>> |x| a|y|
>> >>>> +-+---+-+
>> >>>> |2|bob|5|
>> >>>> +-+---+-+
>> >>>>
>> >>>>
>> >>>> Unfortunetaly and while this is a true for a nullable column
>> (according to
>> >>>> df.printSchema), it is not true for a column that is not nullable:
>> >>>>
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x| a| y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2| bob| 5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> root
>> >>>> |-- x: integer (nullable = false)
>> >>>> |-- a: string (nullable = true)
>> >>>> |-- y: integer (nullable = false)
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x| a| y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2| bob| 5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> such that the output is not affected by the filter. Is this intended?
>> >>>>
>> >>>>
>> >>>> 2. *What is the cheapest (in sense of performance) to turn a
>> non-nullable
>> >>>> column into a nullable column?
>> >>>> A came uo with this:*
>> >>>>
>> >>>> /**
>> >>>> * Set, if a column is nullable.
>> >>>> * @param df source DataFrame
>> >>>> * @param cn is the column name to change
>> >>>> * @param nullable is the flag to set, such that the column is
>> either
>> >>>> nullable or not
>> >>>> */
>> >>>> def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
>> >>>> Boolean) : DataFrame = {
>> >>>>
>> >>>> val schema = df.schema
>> >>>> val newSchema = StructType(schema.map {
>> >>>> case StructField( c, t, _, m) if c.equals(cn) => StructField(
>> c, t,
>> >>>> nullable = nullable, m)
>> >>>> case y: StructField => y
>> >>>> })
>> >>>> df.sqlContext.createDataFrame( df.rdd, newSchema)
>> >>>> }
>> >>>>
>> >>>> Is there a cheaper solution?
>> >>>>
>> >>>> 3. *Any comments?*
>> >>>>
>> >>>> Cheers and thx in advance,
>> >>>>
>> >>>> Martin
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
>> >>>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>
>> >>>
>> >>
>> >
>>
>>
>