You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jone Zhang <jo...@gmail.com> on 2017/05/15 13:15:30 UTC

How can i merge multiple rows to one row in sparksql or hivesql?

For example
Data1(has 1 billion records)
user_id1  feature1
user_id1  feature2

Data2(has 1 billion records)
user_id1  feature3

Data3(has 1 billion records)
user_id1  feature4
user_id1  feature5
...
user_id1  feature100

I want to get the result as follow
user_id1  feature1 feature2 feature3 feature4 feature5...feature100

Is there a more efficient way except join?

Thanks!

Re: How can i merge multiple rows to one row in sparksql or hivesql?

Posted by Edward Capriolo <ed...@gmail.com>.
Here is a similar but not exact way I did something similar to what you
did. I had two data files in different formats the different columns needed
to be different features. I wanted to feed them into spark's:
https://en.wikibooks.org/wiki/Data_Mining_Algorithms_In_R/Frequent_Pattern_Mining/The_FP-Growth_Algorithm

This only works because I have a few named features, and they become fields
in the model object AntecedentUnion. This would be a crappy solution for a
large sparse matrix.

Also my Scala code is crap too so there is probably a better way to do this!


    val b = targ.as[TargetingAntecedent]
    val b1 = b.map(c => (c.tdid, c)).rdd.groupByKey()
    val bgen = b1.map(f =>
      (f._1 , f._2.map
              ( x => AntecedentUnion("targeting", "", x.targetingdataid,
"", "") )
      ) )

    val c = imp.as[ImpressionAntecedent]
    val c1 = c.map(k => (k.tdid, k)).rdd.groupByKey()
    val cgen = c1.map (f =>
      (f._1 , f._2.map
              ( x => AntecedentUnion("impression", "", "", x.campaignid,
x.adgroupid) ).toSet.toIterable
      ) )

    val bgen = TargetingUtil.targetingAntecedent(sparkSession, sqlContext,
targ)
    val cgen = TargetingUtil.impressionAntecedent(sparkSession, sqlContext,
imp)
    val joined = bgen.join(cgen)

    val merged = joined.map(f => (f._1, f._2._1++:(f._2._2) ))
    val fullResults : RDD[Array[AntecedentUnion]] = merged.map(f =>
f._2).map(_.toArray[audacity.AntecedentUnion])


So essentially converting everything into AntecedentUnion where the first
column is the type of the tuple, and other fields are supplied or not. Then
merge all those and run fpgrowth on them. Hope that helps!



On Mon, May 15, 2017 at 12:06 PM, goun na <go...@gmail.com> wrote:
>
> I mentioned it opposite. collect_list generates duplicated results.
>
> 2017-05-16 0:50 GMT+09:00 goun na <go...@gmail.com>:
>>
>> Hi, Jone Zhang
>>
>> 1. Hive UDF
>> You might need collect_set or collect_list (to eliminate duplication),
but make sure reduce its cardinality before applying UDFs as it can cause
problems while handling 1 billion records. Union dataset 1,2,3 -> group by
user_id1 -> collect_set (feature column) would works.
>>
>> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
>>
>> 2.Spark Dataframe Pivot
>>
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html
>>
>> - Goun
>>
>> 2017-05-15 22:15 GMT+09:00 Jone Zhang <jo...@gmail.com>:
>>>
>>> For example
>>> Data1(has 1 billion records)
>>> user_id1  feature1
>>> user_id1  feature2
>>>
>>> Data2(has 1 billion records)
>>> user_id1  feature3
>>>
>>> Data3(has 1 billion records)
>>> user_id1  feature4
>>> user_id1  feature5
>>> ...
>>> user_id1  feature100
>>>
>>> I want to get the result as follow
>>> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
>>>
>>> Is there a more efficient way except join?
>>>
>>> Thanks!
>>
>>
>

Re: How can i merge multiple rows to one row in sparksql or hivesql?

Posted by Edward Capriolo <ed...@gmail.com>.
Here is a similar but not exact way I did something similar to what you
did. I had two data files in different formats the different columns needed
to be different features. I wanted to feed them into spark's:
https://en.wikibooks.org/wiki/Data_Mining_Algorithms_In_R/Frequent_Pattern_Mining/The_FP-Growth_Algorithm

This only works because I have a few named features, and they become fields
in the model object AntecedentUnion. This would be a crappy solution for a
large sparse matrix.

Also my Scala code is crap too so there is probably a better way to do this!


    val b = targ.as[TargetingAntecedent]
    val b1 = b.map(c => (c.tdid, c)).rdd.groupByKey()
    val bgen = b1.map(f =>
      (f._1 , f._2.map
              ( x => AntecedentUnion("targeting", "", x.targetingdataid,
"", "") )
      ) )

    val c = imp.as[ImpressionAntecedent]
    val c1 = c.map(k => (k.tdid, k)).rdd.groupByKey()
    val cgen = c1.map (f =>
      (f._1 , f._2.map
              ( x => AntecedentUnion("impression", "", "", x.campaignid,
x.adgroupid) ).toSet.toIterable
      ) )

    val bgen = TargetingUtil.targetingAntecedent(sparkSession, sqlContext,
targ)
    val cgen = TargetingUtil.impressionAntecedent(sparkSession, sqlContext,
imp)
    val joined = bgen.join(cgen)

    val merged = joined.map(f => (f._1, f._2._1++:(f._2._2) ))
    val fullResults : RDD[Array[AntecedentUnion]] = merged.map(f =>
f._2).map(_.toArray[audacity.AntecedentUnion])


So essentially converting everything into AntecedentUnion where the first
column is the type of the tuple, and other fields are supplied or not. Then
merge all those and run fpgrowth on them. Hope that helps!



On Mon, May 15, 2017 at 12:06 PM, goun na <go...@gmail.com> wrote:
>
> I mentioned it opposite. collect_list generates duplicated results.
>
> 2017-05-16 0:50 GMT+09:00 goun na <go...@gmail.com>:
>>
>> Hi, Jone Zhang
>>
>> 1. Hive UDF
>> You might need collect_set or collect_list (to eliminate duplication),
but make sure reduce its cardinality before applying UDFs as it can cause
problems while handling 1 billion records. Union dataset 1,2,3 -> group by
user_id1 -> collect_set (feature column) would works.
>>
>> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
>>
>> 2.Spark Dataframe Pivot
>>
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html
>>
>> - Goun
>>
>> 2017-05-15 22:15 GMT+09:00 Jone Zhang <jo...@gmail.com>:
>>>
>>> For example
>>> Data1(has 1 billion records)
>>> user_id1  feature1
>>> user_id1  feature2
>>>
>>> Data2(has 1 billion records)
>>> user_id1  feature3
>>>
>>> Data3(has 1 billion records)
>>> user_id1  feature4
>>> user_id1  feature5
>>> ...
>>> user_id1  feature100
>>>
>>> I want to get the result as follow
>>> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
>>>
>>> Is there a more efficient way except join?
>>>
>>> Thanks!
>>
>>
>

Re: How can i merge multiple rows to one row in sparksql or hivesql?

Posted by goun na <go...@gmail.com>.
I mentioned it opposite. collect_list generates duplicated results.

2017-05-16 0:50 GMT+09:00 goun na <go...@gmail.com>:

> Hi, Jone Zhang
>
> 1. Hive UDF
> You might need collect_set or collect_list (to eliminate duplication), but
> make sure reduce its cardinality before applying UDFs as it can cause
> problems while handling 1 billion records. Union dataset 1,2,3 -> group by
> user_id1 -> collect_set (feature column) would works.
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
>
> 2.Spark Dataframe Pivot
> https://databricks.com/blog/2016/02/09/reshaping-data-
> with-pivot-in-apache-spark.html
>
> - Goun
>
> 2017-05-15 22:15 GMT+09:00 Jone Zhang <jo...@gmail.com>:
>
>> For example
>> Data1(has 1 billion records)
>> user_id1  feature1
>> user_id1  feature2
>>
>> Data2(has 1 billion records)
>> user_id1  feature3
>>
>> Data3(has 1 billion records)
>> user_id1  feature4
>> user_id1  feature5
>> ...
>> user_id1  feature100
>>
>> I want to get the result as follow
>> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
>>
>> Is there a more efficient way except join?
>>
>> Thanks!
>>
>
>

Re: How can i merge multiple rows to one row in sparksql or hivesql?

Posted by goun na <go...@gmail.com>.
Hi, Jone Zhang

1. Hive UDF
You might need collect_set or collect_list (to eliminate duplication), but
make sure reduce its cardinality before applying UDFs as it can cause
problems while handling 1 billion records. Union dataset 1,2,3 -> group by
user_id1 -> collect_set (feature column) would works.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

2.Spark Dataframe Pivot
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

- Goun

2017-05-15 22:15 GMT+09:00 Jone Zhang <jo...@gmail.com>:

> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
>
> Data2(has 1 billion records)
> user_id1  feature3
>
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
>
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
>
> Is there a more efficient way except join?
>
> Thanks!
>

Re: How can i merge multiple rows to one row in sparksql or hivesql?

Posted by Didac Gil <di...@gmail.com>.
I guess that if your user_id field is the key, you could use the updateStateByKey function.

I did not test it, but it could be something along these lines:

def yourCombineFunction(input: Seq[(String)],accumulatedInput: Option[(String)] = {
	val state = accumulatedInput.getOrElse((“”)) //In case the current Key was not found before, the features list is empty
	val feature = input._1 //We get the feature value of this new entry

	val newFeature = state._1 +” “+feature
	Some((newFeature)) //The new accumulated value for the features is returned
}

val updatedData = Data1.updateStateByKey(yourCombineFunction) //This would “iterate” among all the entries in your Dataset and, for each row, will update the “accumulatedFeatures”

Good luck

> On 15 May 2017, at 15:15, Jone Zhang <jo...@gmail.com> wrote:
> 
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
> 
> Data2(has 1 billion records)
> user_id1  feature3
> 
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
> 
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
> 
> Is there a more efficient way except join?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacgil9@gmail.com
Spain:     +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia


Re: How can i merge multiple rows to one row in sparksql or hivesql?

Posted by ayan guha <gu...@gmail.com>.
You may consider writing all your data to a nosql datastore such as hbase,
using user id as key.

There is a sql solution using max and inner case and finally union the
results, but that may be expensive
On Tue, 16 May 2017 at 12:13 am, Didac Gil <di...@gmail.com> wrote:

> Or maybe you could also check using the collect_list from the SQL functions
>
> val compacter = Data1.groupBy(“UserID")
>   .agg(org.apache.spark.sql.functions.collect_list(“feature").as(“ListOfFeatures"))
>
>
>
> On 15 May 2017, at 15:15, Jone Zhang <jo...@gmail.com> wrote:
>
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
>
> Data2(has 1 billion records)
> user_id1  feature3
>
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
>
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
>
> Is there a more efficient way except join?
>
> Thanks!
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacgil9@gmail.com
> Spain:     +34 696 285 544
> Sweden: +46 (0)730229737
> Skype: didac.gil.de.la.iglesia
>
> --
Best Regards,
Ayan Guha

Re: How can i merge multiple rows to one row in sparksql or hivesql?

Posted by Didac Gil <di...@gmail.com>.
Or maybe you could also check using the collect_list from the SQL functions
val compacter = Data1.groupBy(“UserID")
  .agg(org.apache.spark.sql.functions.collect_list(“feature").as(“ListOfFeatures"))


> On 15 May 2017, at 15:15, Jone Zhang <jo...@gmail.com> wrote:
> 
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
> 
> Data2(has 1 billion records)
> user_id1  feature3
> 
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
> 
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
> 
> Is there a more efficient way except join?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacgil9@gmail.com
Spain:     +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia