You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by martinibus77 <ma...@googlemail.com> on 2015/07/30 20:19:33 UTC

Spark SQL DataFrame: Nullable column and filtering

Hi all,

1. *Columns in dataframes can be nullable and not nullable. Having a
nullable column of Doubles, I can use the following Scala code to filter all
"non-null" rows:*

  val df = ..... // some code that creates a DataFrame
  df.filter( df("columnname").isNotNull() )

+-+-----+----+                                                                  
|x|    a|   y|
+-+-----+----+
|1|hello|null|
|2|  bob|   5|
+-+-----+----+

root
 |-- x: integer (nullable = false)
 |-- a: string (nullable = true)
 |-- y: integer (nullable = true)

And with the filter expression
+-+---+-+                                                                       
|x|  a|y|
+-+---+-+
|2|bob|5|
+-+---+-+


Unfortunetaly and while this is a true for a nullable column (according to
df.printSchema), it is not true for a column that is not nullable:


+-+-----+----+                                                                  
|x|    a|   y|
+-+-----+----+
|1|hello|null|
|2|  bob|   5|
+-+-----+----+

root
 |-- x: integer (nullable = false)
 |-- a: string (nullable = true)
 |-- y: integer (nullable = false)

+-+-----+----+                                                                  
|x|    a|   y|
+-+-----+----+
|1|hello|null|
|2|  bob|   5|
+-+-----+----+

such that the output is not affected by the filter. Is this intended?


2. *What is the cheapest (in sense of performance) to turn a non-nullable
column into a nullable column?
A came uo with this:*

  /**
   * Set, if a column is nullable.
   * @param df source DataFrame 
   * @param cn is the column name to change
   * @param nullable is the flag to set, such that the column is either
nullable or not
   */
  def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {

    val schema = df.schema
    val newSchema = StructType(schema.map {
      case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t,
nullable = nullable, m)
      case y: StructField => y
    })
    df.sqlContext.createDataFrame( df.rdd, newSchema)
  }

Is there a cheaper solution?

3. *Any comments?*

Cheers and thx in advance,

Martin






--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark SQL DataFrame: Nullable column and filtering

Posted by Martin Senne <ma...@googlemail.com>.
Dear all,

after some fiddling I have arrived at this solution:

/**
 * Customized left outer join on common column.
 */
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF:
DataFrame, commonColumnName: String): DataFrame = {
  val joinedDF = leftDF.as('left).join(rightDF.as('right),
leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")

  import joinedDF.sqlContext.implicits._
  val leftColumns = leftDF.columns
    .map((cn: String) => $"left.$cn")
  val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName))
    .map((cn: String) => $"right.$cn")

  joinedDF.select( leftColumns ++ rightColumns: _*)
}

Comments welcome!!!!

Alternatives I tried:

   - Not Working: If at least the right alias for rightDF is present, one
   could try

   joinedDF.drop("right." + columnname)

   but his does not work (no column is dropped).
   Unfortunately, drop does not support arguments of type Column /
   ColumnNames. *@Michael: Should I create a feature request in Jira for
   drop supporting Columns?*

   -

   Working: Without using aliases via as(...), but using column
renaming instead:

   rightDF.withColumnRenamed( communColumnName, "right_" +
commoncolumnName) to rename the right dataframe column and then do the
join criterion as
   leftDF(commonColumnName) === rightDF("right_" + commonColumnName)

   In my opinion not so neat. Opinions?


Things I observed:

   - Column handling does not seem consistent
      - select(....) supports alias, while drop( ... ) only supports
      strings.
      - DataFrame.apply( .... ) and DataFrame.col do also not support alias.
      - Thus the only way to handly ambiguous columnNames is via select at
      the moment. Can someone please confirm this!
      - Alias information is not displayed via DataFrame.printSchema. (or
   at least I did not find a way of how to)

Cheers,

Martin

2015-07-31 22:51 GMT+02:00 Martin Senne <ma...@googlemail.com>:

> Dear Michael, dear all,
>
> a minimal example is listed below.
>
> After some further analysis I could figure out, that the problem is
> related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use
> columns of the left and right dataframes when doing the select on the
> joined table.
>
>   /**
>    * Customized left outer join on common column.
>    */
>   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = {
>
>     val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
>     val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
>     leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")
>       .select(leftColumns ++ rightColumns: _*)
>   }
>
> As the column "y" of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( "y" ).
>
> Thus, I need to use columns of the joined table for the select.
>
> *Question now: The joined table has column names "x", "a", "x", "y". How do I discard the second x column?*
>
> All my approaches failed (assuming here, that joinedDF is the joined DataFrame.
>
>
>    - Using joinedDFdrop( "x" ) discards both "x" columns.
>    - Using joinedDF("x") does not work as it is ambigious
>    - Also using rightDF.as( "aliasname")  in order to differentiate the
>    column "x" (from left DataFrame) with "x" (from right DataFrame) did not
>    work out, as I found no way as use select( $"aliasname.x") really
>    programmatically. Could someone sketch the code?
>
> Any help welcome, thanks
>
>
> Martin
>
>
>
> ========================================
> import org.apache.spark.sql.types.{StructField, StructType}
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.{DataFrame, SQLContext}
>
> object OtherEntities {
>
>   case class Record( x:Int, a: String)
>   case class Mapping( x: Int, y: Int )
>
>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
>   val mappings = Seq( Mapping(2, 5) )
> }
>
> object MinimalShowcase {
>
>   /**
>    * Customized left outer join on common column.
>    */
>   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = {
>
>     val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
>     val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
>     leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")
>       .select(leftColumns ++ rightColumns: _*)
>   }
>
>
>   /**
>    * Set, if a column is nullable.
>    * @param df source DataFrame
>    * @param cn is the column name to change
>    * @param nullable is the flag to set, such that the column is either nullable or not
>    */
>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {
>
>     val schema = df.schema
>     val newSchema = StructType(schema.map {
>       case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
>       case y: StructField => y
>     })
>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>   }
>
>
>   def main (args: Array[String]) {
>     val conf = new SparkConf()
>       .setAppName("Minimal")
>       .setMaster("local[*]")
>
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     // used to implicitly convert an RDD to a DataFrame.
>     import sqlContext.implicits._
>
>     val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
>     val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
>     val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true)
>
>     val joinedDF = recordDF.join(mappingDF, recordDF("x") === mappingDF("x"), "leftouter")
>     println("joinedDF:")
>     joinedDF.show
>     joinedDF.printSchema
>     joinedDF.filter(joinedDF("y").isNotNull).show
>
> //    joinedDF:
> //    +-+-----+----+----+
> //    |x|    a|   x|   y|
> //    +-+-----+----+----+
> //    |1|hello|null|null|
> //    |2|  bob|   2|   5|
> //    +-+-----+----+----+
> //
> //    root
> //    |-- x: integer (nullable = false)
> //    |-- a: string (nullable = true)
> //    |-- x: integer (nullable = true)
> //    |-- y: integer (nullable = true)
> //
> //    +-+---+-+-+
> //    |x|  a|x|y|
> //    +-+---+-+-+
> //    |2|bob|2|5|
> //    +-+---+-+-+
>
>
>     val extrajoinedDF = leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, "x")
>     println("extrajoinedDF:")
>     extrajoinedDF.show
>     extrajoinedDF.printSchema
>     extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show
>
> //    extrajoinedDF:
> //    +-+-----+----+
> //    |x|    a|   y|
> //    +-+-----+----+
> //    |1|hello|null|
> //    |2|  bob|   5|
> //    +-+-----+----+
> //
> //    root
> //    |-- x: integer (nullable = false)
> //    |-- a: string (nullable = true)
> //    |-- y: integer (nullable = false)
> //
> //    +-+-----+----+
> //    |x|    a|   y|
> //    +-+-----+----+
> //    |1|hello|null|
> //    |2|  bob|   5|
> //    +-+-----+----+
>
>
>
>     val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") === mappingWithNullDF("x"), "leftouter")
>     println("joined2DF:")
>     joined2DF.show
>     joined2DF.printSchema
>     joined2DF.filter(joined2DF("y").isNotNull).show
>
> //    joined2DF:
> //    +-+-----+----+----+
> //    |x|    a|   x|   y|
> //    +-+-----+----+----+
> //    |1|hello|null|null|
> //    |2|  bob|   2|   5|
> //    +-+-----+----+----+
> //
> //    root
> //    |-- x: integer (nullable = false)
> //    |-- a: string (nullable = true)
> //    |-- x: integer (nullable = true)
> //    |-- y: integer (nullable = true)
> //
> //    +-+---+-+-+
> //    |x|  a|x|y|
> //    +-+---+-+-+
> //    |2|bob|2|5|
> //    +-+---+-+-+
>
>   }
> }
>
>
>
> 2015-07-31 1:56 GMT+02:00 Martin Senne <ma...@googlemail.com>:
>
>> Dear Michael, dear all,
>>
>> distinguishing those records that have a match in mapping from those that
>> don't is the crucial point.
>>
>> Record(x : Int,  a: String)
>> Mapping(x: Int, y: Int)
>>
>> Thus
>>
>> Record(1, "hello")
>> Record(2, "bob")
>> Mapping(2, 5)
>>
>> yield (2, "bob", 5) on an inner join.
>> BUT I'm also interested in (1, "hello", null) as there is no counterpart
>> in mapping (this is the left outer join part)
>>
>> I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
>> updates (case 2, bon).
>>
>> Cheers and thanks,
>>
>> Martin
>>
>> Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mi...@databricks.com>:
>> >
>> > Perhaps I'm missing what you are trying to accomplish, but if you'd
>> like to avoid the null values do an inner join instead of an outer join.
>> >
>> > Additionally, I'm confused about how the result
>> of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values
>> in the column y. This doesn't really have anything to do with nullable,
>> which is only a hint to the system so that we can avoid null checking when
>> we know that there are no null values. If you provide the full code i can
>> try and see if this is a bug.
>> >
>> > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <
>> martin.senne@googlemail.com> wrote:
>> >>
>> >> Dear Michael, dear all,
>> >>
>> >> motivation:
>> >>
>> >> object OtherEntities {
>> >>
>> >>   case class Record( x:Int, a: String)
>> >>   case class Mapping( x: Int, y: Int )
>> >>
>> >>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
>> >>   val mappings = Seq( Mapping(2, 5) )
>> >> }
>> >>
>> >> Now I want to perform an left outer join on records and mappings (with
>> the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") ....
>> shorthand is in leftOuterJoinWithRemovalOfEqualColumn
>> >>
>> >> val sqlContext = new SQLContext(sc)
>> >> // used to implicitly convert an RDD to a DataFrame.
>> >> import sqlContext.implicits._
>> >>
>> >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
>> >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
>> >>
>> >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
>> mappingDF, "x")
>> >>
>> >> joinedDF.filter(joinedDF("y").isNotNull).show
>> >>
>> >>
>> >> Currently, the output is
>> >>
>> >>
>> +-+-----+----+
>>
>> >> |x|    a|   y|
>> >> +-+-----+----+
>> >> |1|hello|null|
>> >> |2|  bob|   5|
>> >> +-+-----+----+
>> >>
>> >> instead of
>> >>
>> >>
>> +-+---+-+
>>
>> >> |x|  a|y|
>> >> +-+---+-+
>> >> |2|bob|5|
>> >> +-+---+-+
>> >>
>> >> The last output can be achieved by the method of changing
>> nullable=false to nullable=true described in my first post.
>> >>
>> >> Thus, I need this schema modification as to make outer joins work.
>> >>
>> >> Cheers and thanks,
>> >>
>> >> Martin
>> >>
>> >>
>> >>
>> >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>> >>>
>> >>> We don't yet updated nullability information based on predicates as
>> we don't actually leverage this information in many places yet.  Why do you
>> want to update the schema?
>> >>>
>> >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
>> martin.senne@googlemail.com> wrote:
>> >>>>
>> >>>> Hi all,
>> >>>>
>> >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
>> >>>> nullable column of Doubles, I can use the following Scala code to
>> filter all
>> >>>> "non-null" rows:*
>> >>>>
>> >>>>   val df = ..... // some code that creates a DataFrame
>> >>>>   df.filter( df("columnname").isNotNull() )
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x|    a|   y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2|  bob|   5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> root
>> >>>>  |-- x: integer (nullable = false)
>> >>>>  |-- a: string (nullable = true)
>> >>>>  |-- y: integer (nullable = true)
>> >>>>
>> >>>> And with the filter expression
>> >>>> +-+---+-+
>> >>>> |x|  a|y|
>> >>>> +-+---+-+
>> >>>> |2|bob|5|
>> >>>> +-+---+-+
>> >>>>
>> >>>>
>> >>>> Unfortunetaly and while this is a true for a nullable column
>> (according to
>> >>>> df.printSchema), it is not true for a column that is not nullable:
>> >>>>
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x|    a|   y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2|  bob|   5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> root
>> >>>>  |-- x: integer (nullable = false)
>> >>>>  |-- a: string (nullable = true)
>> >>>>  |-- y: integer (nullable = false)
>> >>>>
>> >>>> +-+-----+----+
>> >>>> |x|    a|   y|
>> >>>> +-+-----+----+
>> >>>> |1|hello|null|
>> >>>> |2|  bob|   5|
>> >>>> +-+-----+----+
>> >>>>
>> >>>> such that the output is not affected by the filter. Is this intended?
>> >>>>
>> >>>>
>> >>>> 2. *What is the cheapest (in sense of performance) to turn a
>> non-nullable
>> >>>> column into a nullable column?
>> >>>> A came uo with this:*
>> >>>>
>> >>>>   /**
>> >>>>    * Set, if a column is nullable.
>> >>>>    * @param df source DataFrame
>> >>>>    * @param cn is the column name to change
>> >>>>    * @param nullable is the flag to set, such that the column is
>> either
>> >>>> nullable or not
>> >>>>    */
>> >>>>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
>> >>>> Boolean) : DataFrame = {
>> >>>>
>> >>>>     val schema = df.schema
>> >>>>     val newSchema = StructType(schema.map {
>> >>>>       case StructField( c, t, _, m) if c.equals(cn) => StructField(
>> c, t,
>> >>>> nullable = nullable, m)
>> >>>>       case y: StructField => y
>> >>>>     })
>> >>>>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>> >>>>   }
>> >>>>
>> >>>> Is there a cheaper solution?
>> >>>>
>> >>>> 3. *Any comments?*
>> >>>>
>> >>>> Cheers and thx in advance,
>> >>>>
>> >>>> Martin
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
>> >>>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>>
>> >>>> ---------------------------------------------------------------------
>> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>
>> >>>
>> >>
>> >
>>
>>
>

Re: Spark SQL DataFrame: Nullable column and filtering

Posted by Martin Senne <ma...@googlemail.com>.
Dear Michael, dear all,

a minimal example is listed below.

After some further analysis I could figure out, that the problem is related
to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of
the left and right dataframes when doing the select on the joined table.

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

    val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
    val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName)).map(cn => rightDF(cn))

    leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), "leftouter")
      .select(leftColumns ++ rightColumns: _*)
  }

As the column "y" of the right table has nullable=false, this is then
also transferred to the joined-Table y-Column, as I use rightDF( "y"
).

Thus, I need to use columns of the joined table for the select.

*Question now: The joined table has column names "x", "a", "x", "y".
How do I discard the second x column?*

All my approaches failed (assuming here, that joinedDF is the joined DataFrame.


   - Using joinedDFdrop( "x" ) discards both "x" columns.
   - Using joinedDF("x") does not work as it is ambigious
   - Also using rightDF.as( "aliasname")  in order to differentiate the
   column "x" (from left DataFrame) with "x" (from right DataFrame) did not
   work out, as I found no way as use select( $"aliasname.x") really
   programmatically. Could someone sketch the code?

Any help welcome, thanks


Martin



========================================
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SQLContext}

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, "hello"), Record(2, "bob"))
  val mappings = Seq( Mapping(2, 5) )
}

object MinimalShowcase {

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

    val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
    val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName)).map(cn => rightDF(cn))

    leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), "leftouter")
      .select(leftColumns ++ rightColumns: _*)
  }


  /**
   * Set, if a column is nullable.
   * @param df source DataFrame
   * @param cn is the column name to change
   * @param nullable is the flag to set, such that the column is
either nullable or not
   */
  def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {

    val schema = df.schema
    val newSchema = StructType(schema.map {
      case StructField( c, t, _, m) if c.equals(cn) => StructField( c,
t, nullable = nullable, m)
      case y: StructField => y
    })
    df.sqlContext.createDataFrame( df.rdd, newSchema)
  }


  def main (args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("Minimal")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    // used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._

    val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
    val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
    val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true)

    val joinedDF = recordDF.join(mappingDF, recordDF("x") ===
mappingDF("x"), "leftouter")
    println("joinedDF:")
    joinedDF.show
    joinedDF.printSchema
    joinedDF.filter(joinedDF("y").isNotNull).show

//    joinedDF:
//    +-+-----+----+----+
//    |x|    a|   x|   y|
//    +-+-----+----+----+
//    |1|hello|null|null|
//    |2|  bob|   2|   5|
//    +-+-----+----+----+
//
//    root
//    |-- x: integer (nullable = false)
//    |-- a: string (nullable = true)
//    |-- x: integer (nullable = true)
//    |-- y: integer (nullable = true)
//
//    +-+---+-+-+
//    |x|  a|x|y|
//    +-+---+-+-+
//    |2|bob|2|5|
//    +-+---+-+-+


    val extrajoinedDF =
leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, "x")
    println("extrajoinedDF:")
    extrajoinedDF.show
    extrajoinedDF.printSchema
    extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show

//    extrajoinedDF:
//    +-+-----+----+
//    |x|    a|   y|
//    +-+-----+----+
//    |1|hello|null|
//    |2|  bob|   5|
//    +-+-----+----+
//
//    root
//    |-- x: integer (nullable = false)
//    |-- a: string (nullable = true)
//    |-- y: integer (nullable = false)
//
//    +-+-----+----+
//    |x|    a|   y|
//    +-+-----+----+
//    |1|hello|null|
//    |2|  bob|   5|
//    +-+-----+----+



    val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") ===
mappingWithNullDF("x"), "leftouter")
    println("joined2DF:")
    joined2DF.show
    joined2DF.printSchema
    joined2DF.filter(joined2DF("y").isNotNull).show

//    joined2DF:
//    +-+-----+----+----+
//    |x|    a|   x|   y|
//    +-+-----+----+----+
//    |1|hello|null|null|
//    |2|  bob|   2|   5|
//    +-+-----+----+----+
//
//    root
//    |-- x: integer (nullable = false)
//    |-- a: string (nullable = true)
//    |-- x: integer (nullable = true)
//    |-- y: integer (nullable = true)
//
//    +-+---+-+-+
//    |x|  a|x|y|
//    +-+---+-+-+
//    |2|bob|2|5|
//    +-+---+-+-+

  }
}



2015-07-31 1:56 GMT+02:00 Martin Senne <ma...@googlemail.com>:

> Dear Michael, dear all,
>
> distinguishing those records that have a match in mapping from those that
> don't is the crucial point.
>
> Record(x : Int,  a: String)
> Mapping(x: Int, y: Int)
>
> Thus
>
> Record(1, "hello")
> Record(2, "bob")
> Mapping(2, 5)
>
> yield (2, "bob", 5) on an inner join.
> BUT I'm also interested in (1, "hello", null) as there is no counterpart
> in mapping (this is the left outer join part)
>
> I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
> updates (case 2, bon).
>
> Cheers and thanks,
>
> Martin
>
> Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mi...@databricks.com>:
> >
> > Perhaps I'm missing what you are trying to accomplish, but if you'd like
> to avoid the null values do an inner join instead of an outer join.
> >
> > Additionally, I'm confused about how the result
> of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values
> in the column y. This doesn't really have anything to do with nullable,
> which is only a hint to the system so that we can avoid null checking when
> we know that there are no null values. If you provide the full code i can
> try and see if this is a bug.
> >
> > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <
> martin.senne@googlemail.com> wrote:
> >>
> >> Dear Michael, dear all,
> >>
> >> motivation:
> >>
> >> object OtherEntities {
> >>
> >>   case class Record( x:Int, a: String)
> >>   case class Mapping( x: Int, y: Int )
> >>
> >>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
> >>   val mappings = Seq( Mapping(2, 5) )
> >> }
> >>
> >> Now I want to perform an left outer join on records and mappings (with
> the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") ....
> shorthand is in leftOuterJoinWithRemovalOfEqualColumn
> >>
> >> val sqlContext = new SQLContext(sc)
> >> // used to implicitly convert an RDD to a DataFrame.
> >> import sqlContext.implicits._
> >>
> >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
> >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
> >>
> >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
> mappingDF, "x")
> >>
> >> joinedDF.filter(joinedDF("y").isNotNull).show
> >>
> >>
> >> Currently, the output is
> >>
> >>
> +-+-----+----+
>
> >> |x|    a|   y|
> >> +-+-----+----+
> >> |1|hello|null|
> >> |2|  bob|   5|
> >> +-+-----+----+
> >>
> >> instead of
> >>
> >>
> +-+---+-+
>
> >> |x|  a|y|
> >> +-+---+-+
> >> |2|bob|5|
> >> +-+---+-+
> >>
> >> The last output can be achieved by the method of changing
> nullable=false to nullable=true described in my first post.
> >>
> >> Thus, I need this schema modification as to make outer joins work.
> >>
> >> Cheers and thanks,
> >>
> >> Martin
> >>
> >>
> >>
> >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
> >>>
> >>> We don't yet updated nullability information based on predicates as we
> don't actually leverage this information in many places yet.  Why do you
> want to update the schema?
> >>>
> >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
> martin.senne@googlemail.com> wrote:
> >>>>
> >>>> Hi all,
> >>>>
> >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
> >>>> nullable column of Doubles, I can use the following Scala code to
> filter all
> >>>> "non-null" rows:*
> >>>>
> >>>>   val df = ..... // some code that creates a DataFrame
> >>>>   df.filter( df("columnname").isNotNull() )
> >>>>
> >>>> +-+-----+----+
> >>>> |x|    a|   y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2|  bob|   5|
> >>>> +-+-----+----+
> >>>>
> >>>> root
> >>>>  |-- x: integer (nullable = false)
> >>>>  |-- a: string (nullable = true)
> >>>>  |-- y: integer (nullable = true)
> >>>>
> >>>> And with the filter expression
> >>>> +-+---+-+
> >>>> |x|  a|y|
> >>>> +-+---+-+
> >>>> |2|bob|5|
> >>>> +-+---+-+
> >>>>
> >>>>
> >>>> Unfortunetaly and while this is a true for a nullable column
> (according to
> >>>> df.printSchema), it is not true for a column that is not nullable:
> >>>>
> >>>>
> >>>> +-+-----+----+
> >>>> |x|    a|   y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2|  bob|   5|
> >>>> +-+-----+----+
> >>>>
> >>>> root
> >>>>  |-- x: integer (nullable = false)
> >>>>  |-- a: string (nullable = true)
> >>>>  |-- y: integer (nullable = false)
> >>>>
> >>>> +-+-----+----+
> >>>> |x|    a|   y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2|  bob|   5|
> >>>> +-+-----+----+
> >>>>
> >>>> such that the output is not affected by the filter. Is this intended?
> >>>>
> >>>>
> >>>> 2. *What is the cheapest (in sense of performance) to turn a
> non-nullable
> >>>> column into a nullable column?
> >>>> A came uo with this:*
> >>>>
> >>>>   /**
> >>>>    * Set, if a column is nullable.
> >>>>    * @param df source DataFrame
> >>>>    * @param cn is the column name to change
> >>>>    * @param nullable is the flag to set, such that the column is
> either
> >>>> nullable or not
> >>>>    */
> >>>>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
> >>>> Boolean) : DataFrame = {
> >>>>
> >>>>     val schema = df.schema
> >>>>     val newSchema = StructType(schema.map {
> >>>>       case StructField( c, t, _, m) if c.equals(cn) => StructField(
> c, t,
> >>>> nullable = nullable, m)
> >>>>       case y: StructField => y
> >>>>     })
> >>>>     df.sqlContext.createDataFrame( df.rdd, newSchema)
> >>>>   }
> >>>>
> >>>> Is there a cheaper solution?
> >>>>
> >>>> 3. *Any comments?*
> >>>>
> >>>> Cheers and thx in advance,
> >>>>
> >>>> Martin
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
> >>>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>>> For additional commands, e-mail: user-help@spark.apache.org
> >>>>
> >>>
> >>
> >
>
>

Re: Spark SQL DataFrame: Nullable column and filtering

Posted by Martin Senne <ma...@googlemail.com>.
Dear Michael, dear all,

distinguishing those records that have a match in mapping from those that
don't is the crucial point.

Record(x : Int,  a: String)
Mapping(x: Int, y: Int)

Thus

Record(1, "hello")
Record(2, "bob")
Mapping(2, 5)

yield (2, "bob", 5) on an inner join.
BUT I'm also interested in (1, "hello", null) as there is no counterpart in
mapping (this is the left outer join part)

I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
updates (case 2, bon).

Cheers and thanks,

Martin
Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mi...@databricks.com>:
>
> Perhaps I'm missing what you are trying to accomplish, but if you'd like
to avoid the null values do an inner join instead of an outer join.
>
> Additionally, I'm confused about how the result
of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values
in the column y. This doesn't really have anything to do with nullable,
which is only a hint to the system so that we can avoid null checking when
we know that there are no null values. If you provide the full code i can
try and see if this is a bug.
>
> On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <
martin.senne@googlemail.com> wrote:
>>
>> Dear Michael, dear all,
>>
>> motivation:
>>
>> object OtherEntities {
>>
>>   case class Record( x:Int, a: String)
>>   case class Mapping( x: Int, y: Int )
>>
>>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
>>   val mappings = Seq( Mapping(2, 5) )
>> }
>>
>> Now I want to perform an left outer join on records and mappings (with
the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") ....
shorthand is in leftOuterJoinWithRemovalOfEqualColumn
>>
>> val sqlContext = new SQLContext(sc)
>> // used to implicitly convert an RDD to a DataFrame.
>> import sqlContext.implicits._
>>
>> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
>> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
>>
>> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
mappingDF, "x")
>>
>> joinedDF.filter(joinedDF("y").isNotNull).show
>>
>>
>> Currently, the output is
>>
>>
+-+-----+----+

>> |x|    a|   y|
>> +-+-----+----+
>> |1|hello|null|
>> |2|  bob|   5|
>> +-+-----+----+
>>
>> instead of
>>
>>
+-+---+-+

>> |x|  a|y|
>> +-+---+-+
>> |2|bob|5|
>> +-+---+-+
>>
>> The last output can be achieved by the method of changing nullable=false
to nullable=true described in my first post.
>>
>> Thus, I need this schema modification as to make outer joins work.
>>
>> Cheers and thanks,
>>
>> Martin
>>
>>
>>
>> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>>>
>>> We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?
>>>
>>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
martin.senne@googlemail.com> wrote:
>>>>
>>>> Hi all,
>>>>
>>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
>>>> nullable column of Doubles, I can use the following Scala code to
filter all
>>>> "non-null" rows:*
>>>>
>>>>   val df = ..... // some code that creates a DataFrame
>>>>   df.filter( df("columnname").isNotNull() )
>>>>
>>>> +-+-----+----+
>>>> |x|    a|   y|
>>>> +-+-----+----+
>>>> |1|hello|null|
>>>> |2|  bob|   5|
>>>> +-+-----+----+
>>>>
>>>> root
>>>>  |-- x: integer (nullable = false)
>>>>  |-- a: string (nullable = true)
>>>>  |-- y: integer (nullable = true)
>>>>
>>>> And with the filter expression
>>>> +-+---+-+
>>>> |x|  a|y|
>>>> +-+---+-+
>>>> |2|bob|5|
>>>> +-+---+-+
>>>>
>>>>
>>>> Unfortunetaly and while this is a true for a nullable column
(according to
>>>> df.printSchema), it is not true for a column that is not nullable:
>>>>
>>>>
>>>> +-+-----+----+
>>>> |x|    a|   y|
>>>> +-+-----+----+
>>>> |1|hello|null|
>>>> |2|  bob|   5|
>>>> +-+-----+----+
>>>>
>>>> root
>>>>  |-- x: integer (nullable = false)
>>>>  |-- a: string (nullable = true)
>>>>  |-- y: integer (nullable = false)
>>>>
>>>> +-+-----+----+
>>>> |x|    a|   y|
>>>> +-+-----+----+
>>>> |1|hello|null|
>>>> |2|  bob|   5|
>>>> +-+-----+----+
>>>>
>>>> such that the output is not affected by the filter. Is this intended?
>>>>
>>>>
>>>> 2. *What is the cheapest (in sense of performance) to turn a
non-nullable
>>>> column into a nullable column?
>>>> A came uo with this:*
>>>>
>>>>   /**
>>>>    * Set, if a column is nullable.
>>>>    * @param df source DataFrame
>>>>    * @param cn is the column name to change
>>>>    * @param nullable is the flag to set, such that the column is either
>>>> nullable or not
>>>>    */
>>>>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
>>>> Boolean) : DataFrame = {
>>>>
>>>>     val schema = df.schema
>>>>     val newSchema = StructType(schema.map {
>>>>       case StructField( c, t, _, m) if c.equals(cn) => StructField( c,
t,
>>>> nullable = nullable, m)
>>>>       case y: StructField => y
>>>>     })
>>>>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>>>>   }
>>>>
>>>> Is there a cheaper solution?
>>>>
>>>> 3. *Any comments?*
>>>>
>>>> Cheers and thx in advance,
>>>>
>>>> Martin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
>>>> Sent from the Apache Spark User List mailing list archive at
Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>
>>
>

Re: Spark SQL DataFrame: Nullable column and filtering

Posted by Michael Armbrust <mi...@databricks.com>.
Perhaps I'm missing what you are trying to accomplish, but if you'd like to
avoid the null values do an inner join instead of an outer join.

Additionally, I'm confused about how the result of joinedDF.filter(joinedDF(
"y").isNotNull).show still contains null values in the column y. This
doesn't really have anything to do with nullable, which is only a hint to
the system so that we can avoid null checking when we know that there are
no null values. If you provide the full code i can try and see if this is a
bug.

On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <ma...@googlemail.com>
wrote:

> Dear Michael, dear all,
>
> motivation:
>
> object OtherEntities {
>
>   case class Record( x:Int, a: String)
>   case class Mapping( x: Int, y: Int )
>
>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
>   val mappings = Seq( Mapping(2, 5) )
> }
>
> Now I want to perform an *left outer join* on records and mappings (with the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") .... shorthand is in *leftOuterJoinWithRemovalOfEqualColumn*
>
> val sqlContext = new SQLContext(sc)
> // used to implicitly convert an RDD to a DataFrame.
> import sqlContext.implicits._
>
> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
>
> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, "x")
>
> joinedDF.filter(joinedDF("y").isNotNull).show
>
>
> Currently, the output is
>
> +-+-----+----+
>
> |x|    a|   y|
> +-+-----+----+
> |1|hello|null|
> |2|  bob|   5|
> +-+-----+----+
>
> instead of
>
> +-+---+-+
>
> |x|  a|y|
> +-+---+-+
> |2|bob|5|
> +-+---+-+
>
> The last output can be achieved by the method of changing nullable=false
> to nullable=true described in my first post.
>
> *Thus, I need this schema modification as to make outer joins work.*
>
> Cheers and thanks,
>
> Martin
>
>
>
> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mi...@databricks.com>:
>
>> We don't yet updated nullability information based on predicates as we
>> don't actually leverage this information in many places yet.  Why do you
>> want to update the schema?
>>
>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
>> martin.senne@googlemail.com> wrote:
>>
>>> Hi all,
>>>
>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
>>> nullable column of Doubles, I can use the following Scala code to filter
>>> all
>>> "non-null" rows:*
>>>
>>>   val df = ..... // some code that creates a DataFrame
>>>   df.filter( df("columnname").isNotNull() )
>>>
>>> +-+-----+----+
>>> |x|    a|   y|
>>> +-+-----+----+
>>> |1|hello|null|
>>> |2|  bob|   5|
>>> +-+-----+----+
>>>
>>> root
>>>  |-- x: integer (nullable = false)
>>>  |-- a: string (nullable = true)
>>>  |-- y: integer (nullable = true)
>>>
>>> And with the filter expression
>>> +-+---+-+
>>> |x|  a|y|
>>> +-+---+-+
>>> |2|bob|5|
>>> +-+---+-+
>>>
>>>
>>> Unfortunetaly and while this is a true for a nullable column (according
>>> to
>>> df.printSchema), it is not true for a column that is not nullable:
>>>
>>>
>>> +-+-----+----+
>>> |x|    a|   y|
>>> +-+-----+----+
>>> |1|hello|null|
>>> |2|  bob|   5|
>>> +-+-----+----+
>>>
>>> root
>>>  |-- x: integer (nullable = false)
>>>  |-- a: string (nullable = true)
>>>  |-- y: integer (nullable = false)
>>>
>>> +-+-----+----+
>>> |x|    a|   y|
>>> +-+-----+----+
>>> |1|hello|null|
>>> |2|  bob|   5|
>>> +-+-----+----+
>>>
>>> such that the output is not affected by the filter. Is this intended?
>>>
>>>
>>> 2. *What is the cheapest (in sense of performance) to turn a non-nullable
>>> column into a nullable column?
>>> A came uo with this:*
>>>
>>>   /**
>>>    * Set, if a column is nullable.
>>>    * @param df source DataFrame
>>>    * @param cn is the column name to change
>>>    * @param nullable is the flag to set, such that the column is either
>>> nullable or not
>>>    */
>>>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
>>> Boolean) : DataFrame = {
>>>
>>>     val schema = df.schema
>>>     val newSchema = StructType(schema.map {
>>>       case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t,
>>> nullable = nullable, m)
>>>       case y: StructField => y
>>>     })
>>>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>>>   }
>>>
>>> Is there a cheaper solution?
>>>
>>> 3. *Any comments?*
>>>
>>> Cheers and thx in advance,
>>>
>>> Martin
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>

Re: Spark SQL DataFrame: Nullable column and filtering

Posted by Martin Senne <ma...@googlemail.com>.
Dear Michael, dear all,

motivation:

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, "hello"), Record(2, "bob"))
  val mappings = Seq( Mapping(2, 5) )
}

Now I want to perform an *left outer join* on records and mappings
(with the ON JOIN criterion on columns (recordDF("x") ===
mappingDF("x") .... shorthand is in
*leftOuterJoinWithRemovalOfEqualColumn*

val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, "x")

joinedDF.filter(joinedDF("y").isNotNull).show


Currently, the output is

+-+-----+----+

|x|    a|   y|
+-+-----+----+
|1|hello|null|
|2|  bob|   5|
+-+-----+----+

instead of

+-+---+-+

|x|  a|y|
+-+---+-+
|2|bob|5|
+-+---+-+

The last output can be achieved by the method of changing nullable=false to
nullable=true described in my first post.

*Thus, I need this schema modification as to make outer joins work.*

Cheers and thanks,

Martin



2015-07-30 20:23 GMT+02:00 Michael Armbrust <mi...@databricks.com>:

> We don't yet updated nullability information based on predicates as we
> don't actually leverage this information in many places yet.  Why do you
> want to update the schema?
>
> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
> martin.senne@googlemail.com> wrote:
>
>> Hi all,
>>
>> 1. *Columns in dataframes can be nullable and not nullable. Having a
>> nullable column of Doubles, I can use the following Scala code to filter
>> all
>> "non-null" rows:*
>>
>>   val df = ..... // some code that creates a DataFrame
>>   df.filter( df("columnname").isNotNull() )
>>
>> +-+-----+----+
>> |x|    a|   y|
>> +-+-----+----+
>> |1|hello|null|
>> |2|  bob|   5|
>> +-+-----+----+
>>
>> root
>>  |-- x: integer (nullable = false)
>>  |-- a: string (nullable = true)
>>  |-- y: integer (nullable = true)
>>
>> And with the filter expression
>> +-+---+-+
>> |x|  a|y|
>> +-+---+-+
>> |2|bob|5|
>> +-+---+-+
>>
>>
>> Unfortunetaly and while this is a true for a nullable column (according to
>> df.printSchema), it is not true for a column that is not nullable:
>>
>>
>> +-+-----+----+
>> |x|    a|   y|
>> +-+-----+----+
>> |1|hello|null|
>> |2|  bob|   5|
>> +-+-----+----+
>>
>> root
>>  |-- x: integer (nullable = false)
>>  |-- a: string (nullable = true)
>>  |-- y: integer (nullable = false)
>>
>> +-+-----+----+
>> |x|    a|   y|
>> +-+-----+----+
>> |1|hello|null|
>> |2|  bob|   5|
>> +-+-----+----+
>>
>> such that the output is not affected by the filter. Is this intended?
>>
>>
>> 2. *What is the cheapest (in sense of performance) to turn a non-nullable
>> column into a nullable column?
>> A came uo with this:*
>>
>>   /**
>>    * Set, if a column is nullable.
>>    * @param df source DataFrame
>>    * @param cn is the column name to change
>>    * @param nullable is the flag to set, such that the column is either
>> nullable or not
>>    */
>>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
>> Boolean) : DataFrame = {
>>
>>     val schema = df.schema
>>     val newSchema = StructType(schema.map {
>>       case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t,
>> nullable = nullable, m)
>>       case y: StructField => y
>>     })
>>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>>   }
>>
>> Is there a cheaper solution?
>>
>> 3. *Any comments?*
>>
>> Cheers and thx in advance,
>>
>> Martin
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Spark SQL DataFrame: Nullable column and filtering

Posted by Michael Armbrust <mi...@databricks.com>.
We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?

On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <ma...@googlemail.com>
wrote:

> Hi all,
>
> 1. *Columns in dataframes can be nullable and not nullable. Having a
> nullable column of Doubles, I can use the following Scala code to filter
> all
> "non-null" rows:*
>
>   val df = ..... // some code that creates a DataFrame
>   df.filter( df("columnname").isNotNull() )
>
> +-+-----+----+
> |x|    a|   y|
> +-+-----+----+
> |1|hello|null|
> |2|  bob|   5|
> +-+-----+----+
>
> root
>  |-- x: integer (nullable = false)
>  |-- a: string (nullable = true)
>  |-- y: integer (nullable = true)
>
> And with the filter expression
> +-+---+-+
> |x|  a|y|
> +-+---+-+
> |2|bob|5|
> +-+---+-+
>
>
> Unfortunetaly and while this is a true for a nullable column (according to
> df.printSchema), it is not true for a column that is not nullable:
>
>
> +-+-----+----+
> |x|    a|   y|
> +-+-----+----+
> |1|hello|null|
> |2|  bob|   5|
> +-+-----+----+
>
> root
>  |-- x: integer (nullable = false)
>  |-- a: string (nullable = true)
>  |-- y: integer (nullable = false)
>
> +-+-----+----+
> |x|    a|   y|
> +-+-----+----+
> |1|hello|null|
> |2|  bob|   5|
> +-+-----+----+
>
> such that the output is not affected by the filter. Is this intended?
>
>
> 2. *What is the cheapest (in sense of performance) to turn a non-nullable
> column into a nullable column?
> A came uo with this:*
>
>   /**
>    * Set, if a column is nullable.
>    * @param df source DataFrame
>    * @param cn is the column name to change
>    * @param nullable is the flag to set, such that the column is either
> nullable or not
>    */
>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
> Boolean) : DataFrame = {
>
>     val schema = df.schema
>     val newSchema = StructType(schema.map {
>       case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t,
> nullable = nullable, m)
>       case y: StructField => y
>     })
>     df.sqlContext.createDataFrame( df.rdd, newSchema)
>   }
>
> Is there a cheaper solution?
>
> 3. *Any comments?*
>
> Cheers and thx in advance,
>
> Martin
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>