You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anand Nalya <an...@gmail.com> on 2015/07/02 15:49:08 UTC

Array fields in dataframe.write.jdbc

Hi,

I'm using spark 1.4. I've a array field in my data frame and when I'm
trying to write this dataframe to postgres, I'm getting the following
exception:

Exception in thread "main" java.lang.IllegalArgumentException: Can't
translate null value for field
StructField(filter,ArrayType(StringType,false),true)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply$mcI$sp(jdbc.scala:182)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3$$anonfun$apply$1.apply(jdbc.scala:169)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:168)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$$anonfun$3.apply(jdbc.scala:167)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.sql.jdbc.package$JDBCWriteDetails$.saveTable(jdbc.scala:167)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:258)
at
analytics.spark.summarizer.SparkBatchSummarizer.start(SparkBatchSummarizer.scala:155)

The schema for the dataframe is:

val schema = StructType(Seq(
        StructField("ts", LongType, false),
        StructField("filter", DataTypes.createArrayType(StringType, false),
true),
        StructField("sort_by", StringType, true),
        StructField("user_type", StringType, true),
        StructField("count", LongType, false)
))

Sample dataframe contents:

+----------+-------+-------+---------+-----+
|        ts| filter|sort_by|user_type|count|
+----------+-------+-------+---------+-----+
|1435052400|List(s)|    abc|     null|    1|
|1435065300|List(s)|    abc|     null|    1|
+----------+-------+-------+---------+-----+

org.apache.spark.sql.jdbc.JDBCWriteDetails#saveTable has the following
definition which does not have the array type handling.

  def saveTable(
        df: DataFrame,
        url: String,
        table: String,
        properties: Properties = new Properties()) {
      val dialect = JdbcDialects.get(url)
      val nullTypes: Array[Int] = df.schema.fields.map { field =>
        dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
          field.dataType match {
            case IntegerType => java.sql.Types.INTEGER
            case LongType => java.sql.Types.BIGINT
            case DoubleType => java.sql.Types.DOUBLE
            case FloatType => java.sql.Types.REAL
            case ShortType => java.sql.Types.INTEGER
            case ByteType => java.sql.Types.INTEGER
            case BooleanType => java.sql.Types.BIT
            case StringType => java.sql.Types.CLOB
            case BinaryType => java.sql.Types.BLOB
            case TimestampType => java.sql.Types.TIMESTAMP
            case DateType => java.sql.Types.DATE
            case DecimalType.Unlimited => java.sql.Types.DECIMAL
            case _ => throw new IllegalArgumentException(
              s"Can't translate null value for field $field")
          })
      }

      val rddSchema = df.schema
      df.foreachPartition { iterator =>
        JDBCWriteDetails.savePartition(url, table, iterator, rddSchema,
nullTypes, properties)
      }
    }

Is there some way of getting arrays working for now?

Thanks,
Anand