You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Justin Lent <ju...@gmail.com> on 2014/02/04 01:38:46 UTC

writing SparkR reducer functions

So i've been struggling with this now for a bit using SparkR.. I can't
even seem to write a basic mean/median function in R that works when
passing it into reduceByKey() for my very simple dataset. I can pass
in R's base function 'sum' and it works just fine. Looking in help
shows that the signature for sum is sum(...)  while mean and median
are mean(x), so that difference is why it's not working -- i just
can't for the life of me write wrappers for mean/median to work with
reduceByKey(). I've pasted in below a take(5) of the RDD i'm trying to
use reduceByKey with to show the structure (its just a list of list).
As well, i also have pasted in the result of doing a groupByKey()
because presumably this is the same data structure the reduceByKey()
gets before it does its reducing right?

FWIW, i've written this function that works fine if run on the results
of a groupByKey() operation so i must be close to what it should be to
work in a reduceByKey():

avg_g <- function(resultFromGroupByKey){
return( lapply( lapply( lapply( resultFromGroupByKey, "[[", 2 ) ,
unlist ) , mean ) )
}

my best guess as to how to convert this to working with reduceByKey is
this below, since it works when i call a single Value from the (K,V)
pair returned from groupByKey(). Unforutnately using it in reduceByKey
results in a java NPE.

avg <- function(x){ return( mean(unlist(x,recursive=FALSE)) ) }


Any help would be appreciated.... and here comes the cut/paste of the
data, and the NPE trace


THE DATA  (take 5, just assume the keys in the whole dataset go from A to Z)

[[1]]
[[1]][[1]]
[1] "A"

[[1]][[2]]
[1] 136


[[2]]
[[2]][[1]]
[1] "A"

[[2]][[2]]
[1] 136


[[3]]
[[3]][[1]]
[1] "A"

[[3]][[2]]
[1] 136


[[4]]
[[4]][[1]]
[1] "A"

[[4]][[2]]
[1] 136


[[5]]
[[5]][[1]]
[1] "A"

[[5]][[2]]
[1] 136



THE DATA AFTER GROUPBYKEY()

[[1]]
[[1]][[1]]
[1] "B"

[[1]][[2]]
[[1]][[2]][[1]]
[1] 136

[[1]][[2]][[2]]
[1] 136

[[1]][[2]][[3]]
[1] 136

[[1]][[2]][[4]]
[1] 136

[[1]][[2]][[5]]
[1] 136

[[1]][[2]][[6]]
[1] 136

[[1]][[2]][[7]]
[1] 136

[[1]][[2]][[8]]
[1] 136





> take( reduceByKey( sparkData_map2, avg, 2L ) , 5 )
Error in (function (x)  : unused argument (136)
Calls: do.call ... FUN -> lapply -> lapply -> FUN -> do.call -> <Anonymous>
Execution halted
14/02/03 16:37:47 ERROR Executor: Exception in task ID 1407
java.lang.NullPointerException
at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
14/02/03 16:37:47 WARN TaskSetManager: Lost TID 1407 (task 1415.0:0)
14/02/03 16:37:47 WARN TaskSetManager: Loss was due to
java.lang.NullPointerException
java.lang.NullPointerException
at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)
14/02/03 16:37:47 ERROR TaskSetManager: Task 1415.0:0 failed 1 times;
aborting job
Error in .jcall(rdd@jrdd, "[Ljava/util/List;", "collectPartitions",
.jarray(as.integer(index))) :
  org.apache.spark.SparkException: Job aborted: Task 1415.0:0 failed 1
times (most recent failure: Exception failure:
java.lang.NullPointerException)

Re: writing SparkR reducer functions

Posted by Justin Lent <ju...@gmail.com>.
after googling around I realize how ridiculous my question is :(

being new to Spark, for some reason I thought all of the basic "stats"
function were implemented in a first class way out of the box over the
mapreduce framework... oops! sorry for the spam :)



On Monday, February 3, 2014, Justin Lent <ju...@gmail.com> wrote:

> So i've been struggling with this now for a bit using SparkR.. I can't
> even seem to write a basic mean/median function in R that works when
> passing it into reduceByKey() for my very simple dataset. I can pass
> in R's base function 'sum' and it works just fine. Looking in help
> shows that the signature for sum is sum(...)  while mean and median
> are mean(x), so that difference is why it's not working -- i just
> can't for the life of me write wrappers for mean/median to work with
> reduceByKey(). I've pasted in below a take(5) of the RDD i'm trying to
> use reduceByKey with to show the structure (its just a list of list).
> As well, i also have pasted in the result of doing a groupByKey()
> because presumably this is the same data structure the reduceByKey()
> gets before it does its reducing right?
>
> FWIW, i've written this function that works fine if run on the results
> of a groupByKey() operation so i must be close to what it should be to
> work in a reduceByKey():
>
> avg_g <- function(resultFromGroupByKey){
> return( lapply( lapply( lapply( resultFromGroupByKey, "[[", 2 ) ,
> unlist ) , mean ) )
> }
>
> my best guess as to how to convert this to working with reduceByKey is
> this below, since it works when i call a single Value from the (K,V)
> pair returned from groupByKey(). Unforutnately using it in reduceByKey
> results in a java NPE.
>
> avg <- function(x){ return( mean(unlist(x,recursive=FALSE)) ) }
>
>
> Any help would be appreciated.... and here comes the cut/paste of the
> data, and the NPE trace
>
>
> THE DATA  (take 5, just assume the keys in the whole dataset go from A to
> Z)
>
> [[1]]
> [[1]][[1]]
> [1] "A"
>
> [[1]][[2]]
> [1] 136
>
>
> [[2]]
> [[2]][[1]]
> [1] "A"
>
> [[2]][[2]]
> [1] 136
>
>
> [[3]]
> [[3]][[1]]
> [1] "A"
>
> [[3]][[2]]
> [1] 136
>
>
> [[4]]
> [[4]][[1]]
> [1] "A"
>
> [[4]][[2]]
> [1] 136
>
>
> [[5]]
> [[5]][[1]]
> [1] "A"
>
> [[5]][[2]]
> [1] 136
>
>
>
> THE DATA AFTER GROUPBYKEY()
>
> [[1]]
> [[1]][[1]]
> [1] "B"
>
> [[1]][[2]]
> [[1]][[2]][[1]]
> [1] 136
>
> [[1]][[2]][[2]]
> [1] 136
>
> [[1]][[2]][[3]]
> [1] 136
>
> [[1]][[2]][[4]]
> [1] 136
>
> [[1]][[2]][[5]]
> [1] 136
>
> [[1]][[2]][[6]]
> [1] 136
>
> [[1]][[2]][[7]]
> [1] 136
>
> [[1]][[2]][[8]]
> [1] 136
>
>
>
>
>
> > take( reduceByKey( sparkData_map2, avg, 2L ) , 5 )
> Error in (function (x)  : unused argument (136)
> Calls: do.call ... FUN -> lapply -> lapply -> FUN -> do.call -> <Anonymous>
> Execution halted
> 14/02/03 16:37:47 ERROR Executor: Exception in task ID 1407
> java.lang.NullPointerException
> at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> 14/02/03 16:37:47 WARN TaskSetManager: Lost TID 1407 (task 1415.0:0)
> 14/02/03 16:37:47 WARN TaskSetManager: Loss was due to
> java.lang.NullPointerException
> java.lang.NullPointerException
> at edu.berkeley.cs.amplab.sparkr.RRDD.compute(RRDD.scala:116)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> 14/02/03 16:37:47 ERROR TaskSetManager: Task 1415.0:0 failed 1 times;
> aborting job
> Error in .jcall(rdd@jrdd, "[Ljava/util/List;", "collectPartitions",
> .jarray(as.integer(index))) :
>   org.apache.spark.SparkException: Job aborted: Task 1415.0:0 failed 1
> times (most recent failure: Exception failure:
> java.lang.NullPointerException)
>


-- 
** Sent from my iPhone *