You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anand Nalya <an...@gmail.com> on 2015/07/02 15:49:08 UTC
Array fields in dataframe.write.jdbc
Hi,
I'm using spark 1.4. I've a array field in my data frame and when I'm
trying to write this dataframe to postgres, I'm getting the following
exception:
Exception in thread "main" java.lang.IllegalArgumentException: Can't
translate null value for field
StructField(filter,ArrayType(StringType,false),true)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply$mcI$sp(jdbc.scala:182)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:168)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:167)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.saveTable(jdbc.scala:167)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:258)
at
analytics.spark.summarizer.SparkBatchSummarizer.start(SparkBatchSummarizer.scala:155)
The schema for the dataframe is:
val schema = StructType(Seq(
StructField("ts", LongType, false),
StructField("filter", DataTypes.createArrayType(StringType, false),
true),
StructField("sort_by", StringType, true),
StructField("user_type", StringType, true),
StructField("count", LongType, false)
))
Sample dataframe contents:
+----------+-------+-------+---------+-----+
| ts| filter|sort_by|user_type|count|
+----------+-------+-------+---------+-----+
|1435052400|List(s)| abc| null| 1|
|1435065300|List(s)| abc| null| 1|
+----------+-------+-------+---------+-----+
org.apache.spark.sql.jdbc.JDBCWriteDetails#saveTable has the following
definition which does not have the array type handling.
def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties = new Properties()) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
case DoubleType => java.sql.Types.DOUBLE
case FloatType => java.sql.Types.REAL
case ShortType => java.sql.Types.INTEGER
case ByteType => java.sql.Types.INTEGER
case BooleanType => java.sql.Types.BIT
case StringType => java.sql.Types.CLOB
case BinaryType => java.sql.Types.BLOB
case TimestampType => java.sql.Types.TIMESTAMP
case DateType => java.sql.Types.DATE
case DecimalType.Unlimited => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
s"Can't translate null value for field $field")
})
}
val rddSchema = df.schema
df.foreachPartition { iterator =>
JDBCWriteDetails.savePartition(url, table, iterator, rddSchema,
nullTypes, properties)
}
}
Is there some way of getting arrays working for now?
Thanks,
Anand