You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tomasz Belina (Jira)" <ji...@apache.org> on 2019/09/06 12:45:00 UTC

[jira] [Created] (SPARK-29009) Returning pojo from udf not working

Tomasz Belina created SPARK-29009:
-------------------------------------

             Summary: Returning pojo from udf not working
                 Key: SPARK-29009
                 URL: https://issues.apache.org/jira/browse/SPARK-29009
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.3
            Reporter: Tomasz Belina


 It looks like spark is unable to construct row from pojo returned from udf.

Give POJO:
{code:java}
public class SegmentStub {
    private int id;
    private Date statusDateTime;
    private int healthPointRatio;
}
{code}
Registration of the UDF:
{code:java}
public class ParseResultsUdf {

    public String registerUdf(SparkSession sparkSession) {
        Encoder<SegmentStub> encoder = Encoders.bean(SegmentStub.class);
        final StructType schema = encoder.schema();
        sparkSession.udf().register(UDF_NAME,
                (UDF2<String, String, SegmentStub>) (s, s2) -> new SegmentStub(1, Date.valueOf(LocalDate.now()), 2),
                schema
        );
        return UDF_NAME;
    }
}
{code}
Test code:
{code:java}
        List<String[]> strings = Arrays.asList(new String[]{"one", "two"},new String[]{"3", "4"});
        JavaRDD<Row> rowJavaRDD = sparkContext.parallelize(strings).map(RowFactory::create);

        StructType schema = DataTypes
                .createStructType(new StructField[] { DataTypes.createStructField("foe1", DataTypes.StringType, false),
                        DataTypes.createStructField("foe2", DataTypes.StringType, false) });


        Dataset<Row> dataFrame = sparkSession.sqlContext().createDataFrame(rowJavaRDD, schema);
        Seq<Column> columnSeq = new Set.Set2<>(col("foe1"), col("foe2")).toSeq();
        dataFrame.select(callUDF(udfName, columnSeq)).show();
{code}
 throws exception: 
{code:java}
Caused by: java.lang.IllegalArgumentException: The value (SegmentStub(id=1, statusDateTime=2019-09-06, healthPointRatio=2)) of the type (udf.SegmentStub) cannot be converted to struct<healthPointRatio:int,id:int,statusDateTime:date>
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
	... 21 more
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org