You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lu...@sina.com on 2016/07/12 08:10:05 UTC

回复:Re: question about UDAF

hi pedro     thanks for your advices. I got my code working as below:code in main:    val hc = new org.apache.spark.sql.hive.HiveContext(sc)
    val hiveTable = hc.sql("select lp_location_id,id,pv from house_id_pv_location_top50")    val jsonArray = new JsonArray
    val middleResult = hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id"), col("pv")).substr(2, 2048).as("id_pv"))
    middleResult.collect.foreach(println)
    middleResult.write.saveAsTable("house_id_pv_top50_json")
code in my UDAF:class JsonArray extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("id", IntegerType) :: StructField("pv", IntegerType) :: Nil)
    
  def bufferSchema: StructType = StructType(
    StructField("id", StringType) :: StructField("pv", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = ""
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[String](0).concat(",{\"id\":\""+input.getInt(0).toString()+"\",\"pv\":\""+input.getInt(1).toString()+"\"}")
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val s1 = buffer1.getAs[Int](0).toString()
    val s2 = buffer2.getAs[Int](0).toString()
    buffer1(0) = s1.concat(s2)
  }
And the result is what I am expecting as attached file.

--------------------------------

 

Thanks&Best regards!
San.Luo

----- 原始邮件 -----
发件人:Pedro Rodriguez <sk...@gmail.com>
收件人:luohui20001@sina.com
抄送人:user <us...@spark.apache.org>
主题:Re: question about UDAF
日期:2016年07月12日 04点17分

I am not sure I understand your code entirely, but I worked with UDAFs Friday and over the weekend (https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a).
I think what is going on is that your "update" function is not defined correctly. Update should take a possibly initialized or in progress buffer and integrate new results in. Right now, you ignore the input row. What is probably occurring is that the initialization value "" is setting the buffer equal to the buffer itself which is "".
Merge is responsible for taking two buffers and merging them together. In this case, the buffers are "" since initialize makes it "" and update keeps it "" so the result is just "". I am not sure it matters, but you probably also want to do buffer.getString(0).
Pedro
On Mon, Jul 11, 2016 at 3:04 AM,  <lu...@sina.com> wrote:
hello guys:     I have a DF and a UDAF. this DF has 2 columns, lp_location_id , id, both are of Int type. I want to group by id and aggregate all value of id into 1 string. So I used a UDAF to do this transformation: multi Int values to 1 String. However my UDAF returns empty values as the accessory attached.     Here is my code for my main class:    val hc = new org.apache.spark.sql.hive.HiveContext(sc)
    val hiveTable = hc.sql("select lp_location_id,id from house_id_pv_location_top50")
    
    val jsonArray = new JsonArray
    val result = hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println)
------------------------------------------------------------------     Here is my code of my UDAF:
class JsonArray extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("id", IntegerType) :: Nil)
    
  def bufferSchema: StructType = StructType(
    StructField("id", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = ""
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val s1 = buffer1.getAs[Int](0).toString()
    val s2 = buffer2.getAs[Int](0).toString()
    buffer1(0) = s1.concat(s2)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

I don't quit understand why I get empty result from my UDAF, I guess there may be 2 reason:1. error initialization with "" in code of define initialize method2. the buffer didn't write successfully.
can anyone share a idea about this. thank you.



--------------------------------

 

Thanks&amp;Best regards!
San.Luo



---------------------------------------------------------------------

To unsubscribe e-mail: user-unsubscribe@spark.apache.org


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU BoulderUC Berkeley AMPLab Alumni
ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience