You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by peng yu <yu...@gmail.com> on 2016/09/29 15:00:59 UTC

udf of aggregation in pyspark dataframe ?

Hi, 

is there a way to write a udf in pyspark support agg()? 


i search all over the docs and internet, and tested it out.. some say yes,
some say no.

and when i try those yes code examples, just complaint about

AnalysisException: u"expression 'pythonUDF' is neither present in the group
by, nor is it an aggregate function. Add to group by or wrap in first() (or
first_value) if you don't care which value you get.;"



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: udf of aggregation in pyspark dataframe ?

Posted by peng yu <yu...@gmail.com>.
btw, i am using spark 1.6.1



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


RE: udf of aggregation in pyspark dataframe ?

Posted by "Mendelson, Assaf" <As...@rsa.com>.
I may be missing something here, but it seems to me you can do it like this:
df.groupBy('a').agg(collect_list('c').alias("a",collect_list('d').alias("b")).withColumn('named_list'), my_zip(F.Col("a"), F.Col("b"))
without needing to write a new aggregation function

-----Original Message-----
From: peng yu [mailto:yupbank@gmail.com] 
Sent: Thursday, September 29, 2016 8:35 PM
To: user@spark.apache.org
Subject: Re: udf of aggregation in pyspark dataframe ?

df:  
---------
a|b|c
-------
1|m|n
1|x | j
2|m|x
...


import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType

def my_zip(c, d):
    return dict(zip(c, d))

my_zip = F.udf(_my_zip, MapType(StingType(), StringType(), True), True)

df.groupBy('a').agg(my_zip(collect_list('c'),
collect_list('d')).alias('named_list'))



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: udf of aggregation in pyspark dataframe ?

Posted by peng yu <yu...@gmail.com>.
df:  
---------
a|b|c
-------
1|m|n
1|x | j
2|m|x
...


import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType

def my_zip(c, d):
    return dict(zip(c, d))

my_zip = F.udf(_my_zip, MapType(StingType(), StringType(), True), True)

df.groupBy('a').agg(my_zip(collect_list('c'),
collect_list('d')).alias('named_list'))



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org