You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:16:40 UTC

[jira] [Resolved] (SPARK-20846) Incorrect posgres sql array column schema inferred from table.

     [ https://issues.apache.org/jira/browse/SPARK-20846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-20846.
----------------------------------
    Resolution: Incomplete

> Incorrect posgres sql array column schema inferred from table.
> --------------------------------------------------------------
>
>                 Key: SPARK-20846
>                 URL: https://issues.apache.org/jira/browse/SPARK-20846
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Stuart Reynolds
>            Priority: Major
>              Labels: bulk-closed
>
> When reading a table containing int[][] columns from postgres, the column is inferred as int[] (should be int[][]).
> {code:python}
>     from pyspark.sql import SQLContext
>     import pandas as pd
>     from dataIngest.util.sqlUtil import asSQLAlchemyEngine
>     user,password = ..., ...
>     url = "postgresql://hostname:5432/dbname"
>     url = 'jdbc:'+url
>     properties = {'user': user, 'password': password}
>     engine = ... sql alchemy engine ...
>      Create pandas df with int[] and int[][]
>     df = pd.DataFrame({
>         'a1': [[1,2,None],[1,2,3], None],
>         'b2':  [[[1],[None],[3]], [[1],[2],[3]], None]
>     })
>     # Store df into postgres as table _dfjunk
>     with engine.connect().execution_options(autocommit=True) as con:
>         con.execute("""
>         DROP TABLE IF EXISTS _dfjunk;
>         
>         CREATE TABLE _dfjunk (
>           a1 int[] NULL,
>           b2 int[][] NULL
>         );
>         """)
>         df.to_sql("_dfjunk", con, index=None, if_exists="append")
>     # Let's access via spark
>     sc = get_spark_context(master="local")
>     sqlContext = SQLContext(sc)
>     print "pandas DF as spark DF:"
>     df = sqlContext.createDataFrame(df)
>     df.printSchema()
>     df.show()
>     df.registerTempTable("df")
>     print sqlContext.sql("select * from df").collect()
>     ### Export _dfjunk as table df3
>     df3 = sqlContext.read.format("jdbc"). \
>         option("url", url). \
>         option("driver", "org.postgresql.Driver"). \
>         option("useUnicode", "true"). \
>         option("continueBatchOnError","true"). \
>         option("useSSL", "false"). \
>         option("user", user). \
>         option("password", password). \
>         option("dbtable", "_dfjunk").\
>         load()
>     df3.registerTempTable("df3")
>     print "DF inferred from postgres:"
>     df3.printSchema()
>     df3.show()
>     print "DF queried from postgres:"
>     df3 = sqlContext.sql("select * from df3")
>     df3.printSchema()
>     df3.show()
>     print df3.collect()
> {code}
> Errors out with:
> pandas DF as spark DF:
> {noformat}
> root
>  |-- a1: array (nullable = true)
>  |    |-- element: long (containsNull = true)
>  |-- b2: array (nullable = true)
>  |    |-- element: array (containsNull = true)
>  |    |    |-- element: long (containsNull = true)  <<< ****** THIS IS CORRECT !!!!
> +------------+--------------------+
> |          a1|                  b2|
> +------------+--------------------+
> |[1, 2, null]|[WrappedArray(1),...|
> |   [1, 2, 3]|[WrappedArray(1),...|
> |        null|                null|
> +------------+--------------------+
> [Row(a1=[1, 2, None], b2=[[1], [None], [3]]), Row(a1=[1, 2, 3], b2=[[1], [2], [3]]), Row(a1=None, b2=None)]
> DF inferred from postgres:
> root
>  |-- a1: array (nullable = true)
>  |    |-- element: integer (containsNull = true)
>  |-- b2: array (nullable = true)
>  |    |-- element: integer (containsNull = true)    <<< ****** THIS IS WRONG!!!! Is an array of arrays.
> 17/05/22 15:00:39 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
> java.lang.ClassCastException: [Ljava.lang.Integer; cannot be cast to java.lang.Integer
> 	at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> 	at org.apache.spark.sql.catalyst.util.GenericArrayData.getInt(GenericArrayData.scala:62)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:99)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org