You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nathan Ronsse <na...@gmail.com> on 2019/02/14 21:04:00 UTC

StackOverflow question regarding DataSets and mapGroups

Hello,
I am experiencing an issue that I also posted here.
<https://stackoverflow.com/questions/54681091/spark-2-0-2-3-datasets-groupbykey-and-mapgroups>

Maybe I should be using an Aggregator instead of mapGroups? I have not
found anything that would lead me to believe I am using mapGroups
incorrectly but please correct me if I am wrong.

I see the correct output of the records when I run locally. However, when I
run on a cluster the output is different, and seemingly inconsistent. Even
some of the mappedGroup output is correct. Is this an issue with spark
Closures? Not sure how best to describe what I am seeing.

It is possible I may not understand mapGroups and that not all values for
each group are making it to the recordList variable.

case class MyCaseClass (keyValue: int,c2: String,c3: String,c4: Double)
case class NewClass (thing1:String,thing2:String,thing3:String,thing4:String)
case class WorkTodo(myClassRecords: Seq[MyCaseClass]){
    def toNewRecords: Seq[NewClass] = {
    //e.g. work that requires all MyCaseClass.keyValue=1 to be in the list.
    //This function would create new Java Objects to perform
calculations and eventually output a set of NewClass records}
val processedRecords = ds.as[MyCaseClass].groupByKey(_.keyValue)
      .mapGroups {
      case (v, iter) => {
        var recordList = new ListBuffer[MyCaseClass]
        iter.foreach {x=>
          recordList += MyCaseClass(x.keyValue,x.c2,x.c3,x.c4)
        }

        WorkToDo(recordList).toNewRecords
      }
    }

P.S. Any other solutions that still use a Dataset are welcome :)


Thanks,

Nate