You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:16:40 UTC
[jira] [Resolved] (SPARK-20846) Incorrect posgres sql array column
schema inferred from table.
[ https://issues.apache.org/jira/browse/SPARK-20846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-20846.
----------------------------------
Resolution: Incomplete
> Incorrect posgres sql array column schema inferred from table.
> --------------------------------------------------------------
>
> Key: SPARK-20846
> URL: https://issues.apache.org/jira/browse/SPARK-20846
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: Stuart Reynolds
> Priority: Major
> Labels: bulk-closed
>
> When reading a table containing int[][] columns from postgres, the column is inferred as int[] (should be int[][]).
> {code:python}
> from pyspark.sql import SQLContext
> import pandas as pd
> from dataIngest.util.sqlUtil import asSQLAlchemyEngine
> user,password = ..., ...
> url = "postgresql://hostname:5432/dbname"
> url = 'jdbc:'+url
> properties = {'user': user, 'password': password}
> engine = ... sql alchemy engine ...
> Create pandas df with int[] and int[][]
> df = pd.DataFrame({
> 'a1': [[1,2,None],[1,2,3], None],
> 'b2': [[[1],[None],[3]], [[1],[2],[3]], None]
> })
> # Store df into postgres as table _dfjunk
> with engine.connect().execution_options(autocommit=True) as con:
> con.execute("""
> DROP TABLE IF EXISTS _dfjunk;
>
> CREATE TABLE _dfjunk (
> a1 int[] NULL,
> b2 int[][] NULL
> );
> """)
> df.to_sql("_dfjunk", con, index=None, if_exists="append")
> # Let's access via spark
> sc = get_spark_context(master="local")
> sqlContext = SQLContext(sc)
> print "pandas DF as spark DF:"
> df = sqlContext.createDataFrame(df)
> df.printSchema()
> df.show()
> df.registerTempTable("df")
> print sqlContext.sql("select * from df").collect()
> ### Export _dfjunk as table df3
> df3 = sqlContext.read.format("jdbc"). \
> option("url", url). \
> option("driver", "org.postgresql.Driver"). \
> option("useUnicode", "true"). \
> option("continueBatchOnError","true"). \
> option("useSSL", "false"). \
> option("user", user). \
> option("password", password). \
> option("dbtable", "_dfjunk").\
> load()
> df3.registerTempTable("df3")
> print "DF inferred from postgres:"
> df3.printSchema()
> df3.show()
> print "DF queried from postgres:"
> df3 = sqlContext.sql("select * from df3")
> df3.printSchema()
> df3.show()
> print df3.collect()
> {code}
> Errors out with:
> pandas DF as spark DF:
> {noformat}
> root
> |-- a1: array (nullable = true)
> | |-- element: long (containsNull = true)
> |-- b2: array (nullable = true)
> | |-- element: array (containsNull = true)
> | | |-- element: long (containsNull = true) <<< ****** THIS IS CORRECT !!!!
> +------------+--------------------+
> | a1| b2|
> +------------+--------------------+
> |[1, 2, null]|[WrappedArray(1),...|
> | [1, 2, 3]|[WrappedArray(1),...|
> | null| null|
> +------------+--------------------+
> [Row(a1=[1, 2, None], b2=[[1], [None], [3]]), Row(a1=[1, 2, 3], b2=[[1], [2], [3]]), Row(a1=None, b2=None)]
> DF inferred from postgres:
> root
> |-- a1: array (nullable = true)
> | |-- element: integer (containsNull = true)
> |-- b2: array (nullable = true)
> | |-- element: integer (containsNull = true) <<< ****** THIS IS WRONG!!!! Is an array of arrays.
> 17/05/22 15:00:39 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
> java.lang.ClassCastException: [Ljava.lang.Integer; cannot be cast to java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at org.apache.spark.sql.catalyst.util.GenericArrayData.getInt(GenericArrayData.scala:62)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org