You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Han-Cheol Cho <ha...@nhn-techorus.com> on 2017/02/01 10:25:07 UTC
A question about inconsistency during dataframe creation with RDD/dict in PySpark
Dear spark user ml members,
I have quite messy input data so it is difficult to load them as a dataframe object
directly.
What I did is to load it as an RDD of strings first, convert it to an RDD of
pyspark.sql.Row objects, then use toDF method as below.
mydf = myrdd.map(parse).toDF()
I didn't expect any problem from this very simple code at first.
But, when I tested it with a bunch of data, I found that this approach fails with the
following exception.
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 10 fields are required while 9 values are provided.
at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:147)
at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
...
This exception comes from the fact that some Row objects in RDD have missing fields.
For example, the following example fails with the same exception
d1 = [Row(k1="value1.1", k2="value1.2")]
d2 = [Row(k1="value2.1")]
rdd1 = spark.sparkContext.parallelize(d1)
rdd2 = spark.sparkContext.parallelize(d2)
urdd = rdd1.union(rdd2)
urdd.collect()
[Row(k1='value1.1', k2='value1.2'), Row(k1='value2.1')]
urdd.toDF()
DataFrame[k1: string, k2: string]
urdd.toDF().show()
--> fail with the same exception
While digging into the code, I found that Row object raises an exception if
it does not have a given key as follows.
# spark/python/pyspark/sql/types.py
def _verify_type(obj, dataType, nullable=True):
...
elif isinstance(dataType, StructType):
...
elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
# the order in obj could be different than dataType.fields
for f in dataType.fields:
_verify_type(obj[f.name], f.dataType, f.nullable)
--> obj[f.name] raise ValueError(item) exception if the key does not exist.
I think that raising an exception in this situation is a reasonable approach.
However, if I use an RDD of dict objects, instead of Row objects, the convert process
succeed as follows by filling missing columns with null values.
dict1 = [{"k1":"v1.1", "k2":"v1.2"}]
dict2 = [{"k1":"v2.1"}]
rdd1 = spark.sparkContext.parallelize(dict1)
rdd2 = spark.sparkContext.parallelize(dict2)
rdd1.collect()
[{'k2': 'v1.2', 'k1': 'v1.1'}]
rdd2.collect()
[{'k1': 'v2.1'}]
urdd = rdd1.union(rdd2)
urdd.collect()
[{'k2': 'v1.2', 'k1': 'v1.1'}, {'k1': 'v2.1'}]
spark.createDataFrame(urdd).show()
+----+----+
| k1| k2|
+----+----+
|v1.1|v1.2|
|v2.1|null|
+----+----+
urdd.toDF().show()
+----+----+
| k1| k2|
+----+----+
|v1.1|v1.2|
|v2.1|null|
+----+----+
I am wonder whether this difference is an expected result or not.
Best wishes,
Han-cheol
Han-Cheol Cho Data Laboratory / Data Scientist <!-- <span id="deptLineBR"><br></span> --> 〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階
Email hancheol.cho@nhn-techorus.com