You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2016/10/08 13:12:20 UTC

[jira] [Comment Edited] (SPARK-11868) wrong results returned from dataframe create from Rows without consistent schma on pyspark

    [ https://issues.apache.org/jira/browse/SPARK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15557944#comment-15557944 ] 

Hyukjin Kwon edited comment on SPARK-11868 at 10/8/16 1:11 PM:
---------------------------------------------------------------

FYI, it now prints differently:

{code}
>>> dicts = [{'1':1,'2':2,'3':3}]*10+[{'1':1,'3':3}]
>>> rows = [pyspark.sql.Row(**r) for r in dicts]
>>> rows_rdd = sc.parallelize(rows)
>>> dicts_rdd = sc.parallelize(dicts)
>>> rows_df = sqlContext.createDataFrame(rows_rdd)
>>> dicts_df = sqlContext.createDataFrame(dicts_rdd)
/Users/hyukjinkwon/Desktop/workspace/local/forked/spark/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "
>>>
>>> print(rows_df.select(['2']).collect()[10])
16/10/08 22:10:03 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 9)
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	...
>>> print(dicts_df.select(['2']).collect()[10])
Row(2=None)
{code}


was (Author: hyukjin.kwon):
FYI, it now prints differently:

{code}
>>> dicts = [{'1':1,'2':2,'3':3}]*10+[{'1':1,'3':3}]
>>> rows = [pyspark.sql.Row(**r) for r in dicts]
>>> rows_rdd = sc.parallelize(rows)
>>> dicts_rdd = sc.parallelize(dicts)
>>> rows_df = sqlContext.createDataFrame(rows_rdd)
>>> dicts_df = sqlContext.createDataFrame(dicts_rdd)
/Users/hyukjinkwon/Desktop/workspace/local/forked/spark/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead
  warnings.warn("Using RDD of dict to inferSchema is deprecated. "
>>>
>>> print(rows_df.select(['2']).collect()[10])
16/10/08 22:10:03 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 9)
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/10/08 22:10:03 WARN TaskSetManager: Lost task 7.0 in stage 2.0 (TID 9, localhost): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

16/10/08 22:10:03 ERROR TaskSetManager: Task 7 in stage 2.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/hyukjinkwon/Desktop/workspace/local/forked/spark/python/pyspark/sql/dataframe.py", line 322, in collect
    port = self._jdf.collectToPython()
  File "/Users/hyukjinkwon/Desktop/workspace/local/forked/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/Users/hyukjinkwon/Desktop/workspace/local/forked/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/Users/hyukjinkwon/Desktop/workspace/local/forked/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o75.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 2.0 failed 1 times, most recent failure: Lost task 7.0 in stage 2.0 (TID 9, localhost): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1901)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1927)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2574)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2571)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2571)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2594)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2571)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 3 fields are required while 2 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:656)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more

>>> print(dicts_df.select(['2']).collect()[10])
Row(2=None)
{code}

> wrong results returned from dataframe create from Rows without consistent schma on pyspark
> ------------------------------------------------------------------------------------------
>
>                 Key: SPARK-11868
>                 URL: https://issues.apache.org/jira/browse/SPARK-11868
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 1.5.2
>         Environment: pyspark
>            Reporter: Yuval Tanny
>
> When schema is inconsistent (but is the sames for the 10 first rows), it's possible to create a dataframe form dictionaries and if a key is missing, its value is None. But when trying to create dataframe from corresponding rows, we get inconsistent behavior (wrong values for keys) without exception. See example below.
> The problems seems to be:
> 1. Not verifying all rows in schema.
> 2. In pyspark.sql.types._create_converter, None is being set when converting dictionary and field is not exist:
> {code}
> return tuple([conv(d.get(name)) for name, conv in zip(names, converters)])
> {code}
> But for Rows, it is just assumed that the number of fields in tuple is equal the number of in the inferred schema, and we place wrong values for wrong keys otherwise:
> {code}
> return tuple(conv(v) for v, conv in zip(obj, converters))
> {code}
> Thanks. 
> example:
> {code}
> dicts = [{'1':1,'2':2,'3':3}]*10+[{'1':1,'3':3}]
> rows = [pyspark.sql.Row(**r) for r in dicts]
> rows_rdd = sc.parallelize(rows)
> dicts_rdd = sc.parallelize(dicts)
> rows_df = sqlContext.createDataFrame(rows_rdd)
> dicts_df = sqlContext.createDataFrame(dicts_rdd)
> print(rows_df.select(['2']).collect()[10])
> print(dicts_df.select(['2']).collect()[10])
> {code}
> output:
> {code}
> Row(2=3)
> Row(2=None)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org