You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Han-Cheol Cho <ha...@nhn-techorus.com> on 2017/02/01 10:25:07 UTC

A question about inconsistency during dataframe creation with RDD/dict in PySpark

Dear spark user ml members,


I have quite messy input data so it is difficult to load them as a dataframe object
directly.
What I did is to load it as an RDD of strings first, convert it to an RDD of 
pyspark.sql.Row objects, then use toDF method as below.
    mydf = myrdd.map(parse).toDF()

I didn't expect any problem from this very simple code at first.


But, when I tested it with a bunch of data, I found that this approach fails with the
following exception.
    java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 10 fields are required while 9 values are provided.                                                                                             
            at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:147)
            at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
            at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
            at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)   
            ...

This exception comes from the fact that some Row objects in RDD have missing fields.
For example, the following example fails with the same exception
    d1 = [Row(k1="value1.1", k2="value1.2")]                                              
    d2 = [Row(k1="value2.1")]                                                                                                                         

    rdd1 = spark.sparkContext.parallelize(d1)                                                        
    rdd2 = spark.sparkContext.parallelize(d2)                                                        

    urdd = rdd1.union(rdd2)                                             
    urdd.collect()                                                      
        [Row(k1='value1.1', k2='value1.2'), Row(k1='value2.1')]                                  

    urdd.toDF()                                                                          
        DataFrame[k1: string, k2: string]                                                                          
    urdd.toDF().show()   
        --> fail with the same exception

While digging into the code, I found that Row object raises an exception if
it does not have a given key as follows.
# spark/python/pyspark/sql/types.py
def _verify_type(obj, dataType, nullable=True):
    ...
    elif isinstance(dataType, StructType):
        ...
        elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
            # the order in obj could be different than dataType.fields
            for f in dataType.fields:
                _verify_type(obj[f.name], f.dataType, f.nullable)
                       --> obj[f.name] raise ValueError(item) exception if the key does not exist.


I think that raising an exception in this situation is a reasonable approach.
However, if I use an RDD of dict objects, instead of Row objects, the convert process
succeed as follows by filling missing columns with null values.
    dict1 = [{"k1":"v1.1", "k2":"v1.2"}]
    dict2 = [{"k1":"v2.1"}]
    
    rdd1 = spark.sparkContext.parallelize(dict1)
    rdd2 = spark.sparkContext.parallelize(dict2)
    rdd1.collect()
        [{'k2': 'v1.2', 'k1': 'v1.1'}]
    rdd2.collect()
        [{'k1': 'v2.1'}]

    urdd = rdd1.union(rdd2)
    urdd.collect()
        [{'k2': 'v1.2', 'k1': 'v1.1'}, {'k1': 'v2.1'}]

    spark.createDataFrame(urdd).show()
        +----+----+
        |  k1|  k2|
        +----+----+
        |v1.1|v1.2|
        |v2.1|null|
        +----+----+

    urdd.toDF().show()
        +----+----+
        |  k1|  k2|
        +----+----+
        |v1.1|v1.2|
        |v2.1|null|
        +----+----+


I am wonder whether this difference is an expected result or not.


Best wishes,
Han-cheol



 Han-Cheol Cho  Data Laboratory   / Data Scientist <!-- <span id="deptLineBR"><br></span>  --> 〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階
Email  hancheol.cho@nhn-techorus.com