You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Neha Mehta <ne...@gmail.com> on 2016/01/18 12:47:11 UTC

How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

Hi,

I have a scenario wherein my dataset has around 30 columns. It is basically
user activity information. I need to group the information by each user and
then for each column/activity parameter I need to find the percentage
affinity for each value in that column for that user. Below is the sample
input and output.

UserId C1 C2 C3
1 A <20 0
1 A >20 & <40 1
1 B >20 & <40 0
1 C >20 & <40 0
1 C >20 & <40 0
2 A <20 0
3 B >20 & <40 1
3 B >40 2








Output


1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
2 A:1 <20:1 0:01
3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5

Presently this is how I am calculating these values:
Group by UserId and C1 and compute values for C1 for all the users, then do
a group by by Userid and C2 and find the fractions for C2 for each user and
so on. This approach is quite slow.  Also the number of records for each
user will be at max 30. So I would like to take a second approach wherein I
do a groupByKey and pass the entire list of records for each key to a
function which computes all the percentages for each column for each user
at once. Below are the steps I am trying to follow:

1. Dataframe1 => group by UserId , find the counts of records for each
user. Join the results back to the input so that counts are available with
each record
2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))

def myUserAggregator(rows: Iterable[Row]):
scala.collection.mutable.Map[Int,String] = {
    val returnValue = scala.collection.mutable.Map[Int,String]()
    if (rows != null) {
      val activityMap = scala.collection.mutable.Map[Int,
scala.collection.mutable.Map[String,
Int]]().withDefaultValue(scala.collection.mutable.Map[String,
Int]().withDefaultValue(0))

      val rowIt = rows.iterator
      var sentCount = 1
      for (row <- rowIt) {
        sentCount = row(29).toString().toInt
        for (i <- 0 until row.length) {
          var m = activityMap(i)
          if (activityMap(i) == null) {
            m = collection.mutable.Map[String,
Int]().withDefaultValue(0)
          }
          m(row(i).toString()) += 1
          activityMap.update(i, m)
        }
      }
      var activityPPRow: Row = Row()
      for((k,v) <- activityMap) {
          var rowVal:String = ""
          for((a,b) <- v) {
            rowVal += rowVal + a + ":" + b/sentCount + "|"
          }
          returnValue.update(k, rowVal)
    //      activityPPRow.apply(k) = rowVal
      }

    }
    return returnValue
  }

When I run step 2 I get the following error. I am new to Scala and Spark
and am unable to figure out how to pass the Iterable[Row] to a function and
get back the results.

org.apache.spark.SparkException: Task not serializable
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
    at org.apache.spark.rdd.RDD.map(RDD.scala:317)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:102)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:104)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:106)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:108)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:110)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:112)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:114)
    at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:116)
......


Thanks for the help.

Regards,
Neha Mehta

Re: How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

Posted by Neha Mehta <ne...@gmail.com>.
Hi Vishal,

Thanks for the solution. I was able to get it working for my scenario.
Regarding the Task not serializable error, I still get it when I declare a
function outside the main method. However, if I declare it inside the main
"val func = {}", it is working fine for me.

In case you have any insight to share on the same, then please do share it.

Thanks for the help.

Regards,
Neha

On Wed, Jan 20, 2016 at 11:39 AM, Vishal Maru <vz...@gmail.com> wrote:

> It seems Spark is not able to serialize your function code to worker nodes.
>
> I have tried to put a solution in simple set of commands. Maybe you can
> combine last four line into function.
>
>
> val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 &
> <40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"),
> (2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2"))
>
> val rdd = sc.parallelize(arr)
>
> val prdd = rdd.map(a => (a._1,a))
> val totals = prdd.groupByKey.map(a => (a._1, a._2.size))
>
> var n1 = rdd.map(a => ((a._1, a._2), 1) )
> var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2)))
> var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble
> / a._2._2)))
> var n4 = n3.map(a => (a._1, a._2._1 + ":" +
> a._2._2.toString)).reduceByKey((a, b) => a + "|" + b)
>
> n4.collect.foreach(println)
>
>
>
>
> On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <ne...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a scenario wherein my dataset has around 30 columns. It is
>> basically user activity information. I need to group the information by
>> each user and then for each column/activity parameter I need to find the
>> percentage affinity for each value in that column for that user. Below is
>> the sample input and output.
>>
>> UserId C1 C2 C3
>> 1 A <20 0
>> 1 A >20 & <40 1
>> 1 B >20 & <40 0
>> 1 C >20 & <40 0
>> 1 C >20 & <40 0
>> 2 A <20 0
>> 3 B >20 & <40 1
>> 3 B >40 2
>>
>>
>>
>>
>>
>>
>>
>>
>> Output
>>
>>
>> 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
>> 2 A:1 <20:1 0:01
>> 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5
>>
>> Presently this is how I am calculating these values:
>> Group by UserId and C1 and compute values for C1 for all the users, then
>> do a group by by Userid and C2 and find the fractions for C2 for each user
>> and so on. This approach is quite slow.  Also the number of records for
>> each user will be at max 30. So I would like to take a second approach
>> wherein I do a groupByKey and pass the entire list of records for each key
>> to a function which computes all the percentages for each column for each
>> user at once. Below are the steps I am trying to follow:
>>
>> 1. Dataframe1 => group by UserId , find the counts of records for each
>> user. Join the results back to the input so that counts are available with
>> each record
>> 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))
>>
>> def myUserAggregator(rows: Iterable[Row]):
>> scala.collection.mutable.Map[Int,String] = {
>>     val returnValue = scala.collection.mutable.Map[Int,String]()
>>     if (rows != null) {
>>       val activityMap = scala.collection.mutable.Map[Int,
>> scala.collection.mutable.Map[String,
>> Int]]().withDefaultValue(scala.collection.mutable.Map[String,
>> Int]().withDefaultValue(0))
>>
>>       val rowIt = rows.iterator
>>       var sentCount = 1
>>       for (row <- rowIt) {
>>         sentCount = row(29).toString().toInt
>>         for (i <- 0 until row.length) {
>>           var m = activityMap(i)
>>           if (activityMap(i) == null) {
>>             m = collection.mutable.Map[String,
>> Int]().withDefaultValue(0)
>>           }
>>           m(row(i).toString()) += 1
>>           activityMap.update(i, m)
>>         }
>>       }
>>       var activityPPRow: Row = Row()
>>       for((k,v) <- activityMap) {
>>           var rowVal:String = ""
>>           for((a,b) <- v) {
>>             rowVal += rowVal + a + ":" + b/sentCount + "|"
>>           }
>>           returnValue.update(k, rowVal)
>>     //      activityPPRow.apply(k) = rowVal
>>       }
>>
>>     }
>>     return returnValue
>>   }
>>
>> When I run step 2 I get the following error. I am new to Scala and Spark
>> and am unable to figure out how to pass the Iterable[Row] to a function and
>> get back the results.
>>
>> org.apache.spark.SparkException: Task not serializable
>>     at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>>     at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>>     at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
>>     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
>>     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
>>     at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>     at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>>     at org.apache.spark.rdd.RDD.map(RDD.scala:317)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:102)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:104)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:106)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:108)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:110)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:112)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:114)
>>     at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:116)
>> ......
>>
>>
>> Thanks for the help.
>>
>> Regards,
>> Neha Mehta
>>
>
>

Re: How to call a custom function from GroupByKey which takes Iterable[Row] as input and returns a Map[Int,String] as output in scala

Posted by Vishal Maru <vz...@gmail.com>.
It seems Spark is not able to serialize your function code to worker nodes.

I have tried to put a solution in simple set of commands. Maybe you can
combine last four line into function.


val arr = Array((1,"A","<20","0"), (1,"A",">20 & <40","1"), (1,"B",">20 &
<40","0"), (1,"C",">20 & <40","0"), (1,"C",">20 & <40","0"),
(2,"A","<20","0"), (3,"B",">20 & <40","1"), (3,"B",">40","2"))

val rdd = sc.parallelize(arr)

val prdd = rdd.map(a => (a._1,a))
val totals = prdd.groupByKey.map(a => (a._1, a._2.size))

var n1 = rdd.map(a => ((a._1, a._2), 1) )
var n2 = n1.reduceByKey(_+_).map(a => (a._1._1, (a._1._2, a._2)))
var n3 = n2.join(totals).map(a => (a._1, (a._2._1._1, a._2._1._2.toDouble /
a._2._2)))
var n4 = n3.map(a => (a._1, a._2._1 + ":" +
a._2._2.toString)).reduceByKey((a, b) => a + "|" + b)

n4.collect.foreach(println)




On Mon, Jan 18, 2016 at 6:47 AM, Neha Mehta <ne...@gmail.com> wrote:

> Hi,
>
> I have a scenario wherein my dataset has around 30 columns. It is
> basically user activity information. I need to group the information by
> each user and then for each column/activity parameter I need to find the
> percentage affinity for each value in that column for that user. Below is
> the sample input and output.
>
> UserId C1 C2 C3
> 1 A <20 0
> 1 A >20 & <40 1
> 1 B >20 & <40 0
> 1 C >20 & <40 0
> 1 C >20 & <40 0
> 2 A <20 0
> 3 B >20 & <40 1
> 3 B >40 2
>
>
>
>
>
>
>
>
> Output
>
>
> 1 A:0.4|B:0.2|C:0.4 <20:02|>20 & <40:0.8 0:0.8|1:0.2
> 2 A:1 <20:1 0:01
> 3 B:1 >20 & <40:0.5|>40:0.5 1:0.5|2:0.5
>
> Presently this is how I am calculating these values:
> Group by UserId and C1 and compute values for C1 for all the users, then
> do a group by by Userid and C2 and find the fractions for C2 for each user
> and so on. This approach is quite slow.  Also the number of records for
> each user will be at max 30. So I would like to take a second approach
> wherein I do a groupByKey and pass the entire list of records for each key
> to a function which computes all the percentages for each column for each
> user at once. Below are the steps I am trying to follow:
>
> 1. Dataframe1 => group by UserId , find the counts of records for each
> user. Join the results back to the input so that counts are available with
> each record
> 2. Dataframe1.map(s=>s(1),s).groupByKey().map(s=>myUserAggregator(s._2))
>
> def myUserAggregator(rows: Iterable[Row]):
> scala.collection.mutable.Map[Int,String] = {
>     val returnValue = scala.collection.mutable.Map[Int,String]()
>     if (rows != null) {
>       val activityMap = scala.collection.mutable.Map[Int,
> scala.collection.mutable.Map[String,
> Int]]().withDefaultValue(scala.collection.mutable.Map[String,
> Int]().withDefaultValue(0))
>
>       val rowIt = rows.iterator
>       var sentCount = 1
>       for (row <- rowIt) {
>         sentCount = row(29).toString().toInt
>         for (i <- 0 until row.length) {
>           var m = activityMap(i)
>           if (activityMap(i) == null) {
>             m = collection.mutable.Map[String,
> Int]().withDefaultValue(0)
>           }
>           m(row(i).toString()) += 1
>           activityMap.update(i, m)
>         }
>       }
>       var activityPPRow: Row = Row()
>       for((k,v) <- activityMap) {
>           var rowVal:String = ""
>           for((a,b) <- v) {
>             rowVal += rowVal + a + ":" + b/sentCount + "|"
>           }
>           returnValue.update(k, rowVal)
>     //      activityPPRow.apply(k) = rowVal
>       }
>
>     }
>     return returnValue
>   }
>
> When I run step 2 I get the following error. I am new to Scala and Spark
> and am unable to figure out how to pass the Iterable[Row] to a function and
> get back the results.
>
> org.apache.spark.SparkException: Task not serializable
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>     at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
>     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
>     at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>     at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>     at org.apache.spark.rdd.RDD.map(RDD.scala:317)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:102)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:104)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:106)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:108)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:110)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:112)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:114)
>     at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:116)
> ......
>
>
> Thanks for the help.
>
> Regards,
> Neha Mehta
>