You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "yzheng616 (JIRA)" <ji...@apache.org> on 2017/09/27 02:03:00 UTC

[jira] [Created] (SPARK-22137) Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)

yzheng616 created SPARK-22137:
---------------------------------

             Summary: Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)
                 Key: SPARK-22137
                 URL: https://issues.apache.org/jira/browse/SPARK-22137
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.1
            Reporter: yzheng616


Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String). The issue seems similar with SPARK-17765 which have been resolved in 2.1.0. 

Error message: 
{color:red}Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`features`' due to data type mismatch: cannot cast org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));;
'InsertIntoTable Relation[id#21,features#22] parquet, OverwriteOptions(false,Map()), false
+- 'Project [cast(id#13L as int) AS id#27, cast(features#14 as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) AS features#28]
   +- LogicalRDD [id#13L, features#14]{color}

Reproduce code:

{code:java}
import scala.annotation.varargs
import org.apache.spark.ml.linalg.SQLDataTypes
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType


case class UDT(`id`: Long, `features`: org.apache.spark.ml.linalg.Vector)

object UDTTest {

  def main(args: Array[String]): Unit = {
    val tb = "table_udt"
    val spark = SparkSession.builder().master("local[4]").appName("UDTInsertInto").enableHiveSupport().getOrCreate()

    spark.sql("drop table if exists " + tb)
    
    /*
     * VectorUDT sql type definition:
     * 
     *   override def sqlType: StructType = {
     *   StructType(Seq(
     *   	StructField("type", ByteType, nullable = false),
     *   	StructField("size", IntegerType, nullable = true),
     *   	StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true),
     *   	StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)))
     *   }
    */
    
    //Create Hive table base on VectorUDT sql type
    spark.sql("create table if not exists "+tb+"(id int, features struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)" +
      " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
      " stored as"+
        " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
        " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

    var seq = new scala.collection.mutable.ArrayBuffer[UDT]()
    for (x <- 1 to 2) {
      seq += (new UDT(x, org.apache.spark.ml.linalg.Vectors.dense(0.2, 0.21, 0.44)))
    }

    val rowRDD = (spark.sparkContext.makeRDD[UDT](seq)).map { x => Row.fromSeq(Seq(x.id,x.features)) }
    val schema = StructType(Array(StructField("id", LongType,false),StructField("features", SQLDataTypes.VectorType,false)))
    val df = spark.createDataFrame(rowRDD, schema)
     
    //insert into hive table
    df.write.insertInto(tb)
  }
}
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org