You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deenar Toraskar <de...@gmail.com> on 2016/01/13 18:22:26 UTC

Re: Spark SQL UDF with Struct input parameters

I have raised a JIRA to cover this
https://issues.apache.org/jira/browse/SPARK-12809

On 13 January 2016 at 16:05, Deenar Toraskar <
deenar.toraskar@thinkreactive.co.uk> wrote:

> Frank
>
> Sorry got my wires crossed, I had come across another issue. Now I
> remember this issue I got around this splitting the structure into 2 arrays
> and then zipping them in the UDF. So
>
> def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float],
> Seq[Float])])=expectedExposures.map(x=> x._1 *
> x._2).sum/expectedExposures.map(x=>x._1).sum
>
>
> became
>
>
> def expectedPositiveExposureSeq(expectedExposures: Seq[Float],
> timeIntervals : Seq[Float])= timeIntervals.zip(expectedExposures).map(x=>
> (x._1 * x._2)).sum/timeIntervals.sum
>
> Deenar
>
>
>
>
>
> *Think Reactive Ltd*
> deenar.toraskar@thinkreactive.co.uk
> 07714140812
>
>
>
> On 13 January 2016 at 15:42, Rosner, Frank (Allianz SE) <
> FRANK.ROSNER@allianz.com> wrote:
>
>> The problem is that I cannot use a UDF that has a structtype as input
>> (which seems to be the same problem that you were facing). Which key and
>> value are you talking about? They are both Seq[Float] in your example.
>>
>>
>>
>> In my example when I try to call a udf that takes a struct type I get
>>
>>
>>
>> cannot resolve 'UDF(myColumn)' due to data type mismatch: argument 1
>> requires array<struct<_1:bigint,_2:string>> type, however, 'myColumn' is of
>> array<struct<index:bigint,value:string>> type.
>>
>>
>>
>> When I then created a case class instead of using a tuple (so not to have
>> _1 but the correct name) it compiles. But when I execute it, it cannot cast
>> it to the case class because obviously the data does not contain the case
>> class inside.
>>
>>
>>
>> How would rewriting collect as a Spark UDAF help there?
>>
>>
>>
>> Thanks for your quick response!
>>
>> Frank
>>
>>
>>
>> *From:* Deenar Toraskar [mailto:deenar.toraskar@thinkreactive.co.uk]
>> *Sent:* Mittwoch, 13. Januar 2016 15:56
>> *To:* Rosner, Frank (Allianz SE)
>> *Subject:* Re: Spark SQL UDF with Struct input parameters
>>
>>
>>
>> Frank
>>
>>
>>
>> I did not find a solution, as a work around I made both the key and value
>> to be of the same data type. I am going to rewrite collect as a Spark UDAF
>> when I have some spare time. You may want to do this if this is a show
>> stopper for you.
>>
>>
>>
>> Regards
>>
>> Deenar
>>
>>
>>
>>
>> *Think Reactive Ltd*
>>
>> deenar.toraskar@thinkreactive.co.uk
>>
>> 07714140812
>>
>>
>>
>>
>>
>> On 13 January 2016 at 13:50, Rosner, Frank (Allianz SE) <
>> FRANK.ROSNER@allianz.com> wrote:
>>
>> Hey!
>>
>> Did you solve the issue? I am facing the same issue and cannot find a
>> solution.
>>
>> Thanks
>> Frank
>>
>> Hi
>>
>>
>>
>> I am trying to define an UDF that can take an array of tuples as input
>>
>>
>>
>> def effectiveExpectedExposure(expectedExposures: Seq[(Seq[Float],
>>
>> Seq[Float])])=
>>
>> expectedExposures.map(x=> x._1 * x._2).sum/expectedExposures.map(x=>
>>
>> x._1).sum
>>
>>
>>
>> sqlContext.udf.register("expectedPositiveExposure",
>>
>> expectedPositiveExposure _)
>>
>>
>>
>> I get the following message when I try calling this function, where
>>
>> noOfMonths and ee are both floats
>>
>>
>>
>> val df = sqlContext.sql(s"select (collect(struct(noOfMonths, ee))) as eee
>>
>> from netExposureCpty where counterparty = 'xyz'")
>>
>> df.registerTempTable("test")
>>
>> sqlContext.sql("select effectiveExpectedExposure(eee)  from test")
>>
>>
>>
>> Error in SQL statement: AnalysisException: cannot resolve 'UDF(eee)' due
>> to
>>
>> data type mismatch: argument 1 requires array<struct<_1:float,_2:float>>
>>
>> type, however, 'eee' is of array<struct<noofmonths:float,ee:float>> type.;
>>
>> line 1 pos 33
>>
>>
>>
>> Deenar
>>
>>
>>
>>
>>
>>
>>
>
>