You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Archit Thakur <ar...@gmail.com> on 2014/01/26 21:22:51 UTC

GroupByKey implementation.

Hi,

Below is the implementation for GroupByKey. (v, 0.8.0)


def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
    def createCombiner(v: V) = ArrayBuffer(v)
    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
    val bufs = combineByKey[ArrayBuffer[V]](
      createCombiner _, mergeValue _, null, partitioner,
mapSideCombine=false)
    bufs.asInstanceOf[RDD[(K, Seq[V])]]
  }

and CombineValuesByKey (Aggregator.scala):

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
C)] = {
    val combiners = new JHashMap[K, C]
    for (kv <- iter) {
      val oldC = combiners.get(kv._1)
      if (oldC == null) {
        combiners.put(kv._1, createCombiner(kv._2))
      } else {
        combiners.put(kv._1, mergeValue(oldC, kv._2))
      }
    }
    combiners.iterator
  }

My doubt is why null is being passed for mergeCombiners closure.

If two different partitions have same key, wouldn't there be the
requirement to merge them afterwards?

Thanks,
Archit.

Re: GroupByKey implementation.

Posted by Archit Thakur <ar...@gmail.com>.
Thanks Mark, Reynold for the quick response.


On Mon, Jan 27, 2014 at 5:07 AM, Reynold Xin <rx...@databricks.com> wrote:

> While I echo Mark's sentiment, versioning has nothing to do with this
> problem. It has been the case even in Spark 0.8.0.
>
> Note that mapSideCombine is turned off for groupByKey, so there is no need
> to merge any combiners.
>
>
> On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
> <ar...@gmail.com>wrote:
>
> > Hi,
> >
> > Below is the implementation for GroupByKey. (v, 0.8.0)
> >
> >
> > def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
> >     def createCombiner(v: V) = ArrayBuffer(v)
> >     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
> >     val bufs = combineByKey[ArrayBuffer[V]](
> >       createCombiner _, mergeValue _, null, partitioner,
> > mapSideCombine=false)
> >     bufs.asInstanceOf[RDD[(K, Seq[V])]]
> >   }
> >
> > and CombineValuesByKey (Aggregator.scala):
> >
> > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> Iterator[(K,
> > C)] = {
> >     val combiners = new JHashMap[K, C]
> >     for (kv <- iter) {
> >       val oldC = combiners.get(kv._1)
> >       if (oldC == null) {
> >         combiners.put(kv._1, createCombiner(kv._2))
> >       } else {
> >         combiners.put(kv._1, mergeValue(oldC, kv._2))
> >       }
> >     }
> >     combiners.iterator
> >   }
> >
> > My doubt is why null is being passed for mergeCombiners closure.
> >
> > If two different partitions have same key, wouldn't there be the
> > requirement to merge them afterwards?
> >
> > Thanks,
> > Archit.
> >
>

Re: GroupByKey implementation.

Posted by Reynold Xin <rx...@databricks.com>.
While I echo Mark's sentiment, versioning has nothing to do with this
problem. It has been the case even in Spark 0.8.0.

Note that mapSideCombine is turned off for groupByKey, so there is no need
to merge any combiners.


On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
<ar...@gmail.com>wrote:

> Hi,
>
> Below is the implementation for GroupByKey. (v, 0.8.0)
>
>
> def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
>     def createCombiner(v: V) = ArrayBuffer(v)
>     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
>     val bufs = combineByKey[ArrayBuffer[V]](
>       createCombiner _, mergeValue _, null, partitioner,
> mapSideCombine=false)
>     bufs.asInstanceOf[RDD[(K, Seq[V])]]
>   }
>
> and CombineValuesByKey (Aggregator.scala):
>
> def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
> C)] = {
>     val combiners = new JHashMap[K, C]
>     for (kv <- iter) {
>       val oldC = combiners.get(kv._1)
>       if (oldC == null) {
>         combiners.put(kv._1, createCombiner(kv._2))
>       } else {
>         combiners.put(kv._1, mergeValue(oldC, kv._2))
>       }
>     }
>     combiners.iterator
>   }
>
> My doubt is why null is being passed for mergeCombiners closure.
>
> If two different partitions have same key, wouldn't there be the
> requirement to merge them afterwards?
>
> Thanks,
> Archit.
>

Re: GroupByKey implementation.

Posted by Mark Hamstra <ma...@clearstorydata.com>.
That was run on 0.8.0-incubating ...which raises a question that has been
recurring to me of late: Why are people continuing to use 0.8.0 months
after 0.8.1 has been out and when 0.9.0 is in release candidates?  It
doesn't make a relevant difference in this case, but in general, chasing
bugs in code that is two generations out-of-date doesn't make for very
efficient development.  Spark is still pre-1.0 and is rapidly-developing
software.  As such, you should expect that the pain of staying up-to-date
is less than the pain of falling months behind -- but there is no avoiding
pain in pre-1.0 software.  Once we reach more stability and more rigorous
versioning/release practices with 1.0, it will make more sense to stick
with a major.minor release for a while and only pick up the
major.minor.patchlevel increments, but we're not there yet.


On Sun, Jan 26, 2014 at 1:45 PM, Archit Thakur <ar...@gmail.com>wrote:

> Which spark version are you on?
>
>
> On Mon, Jan 27, 2014 at 3:12 AM, Mark Hamstra <mark@clearstorydata.com
> >wrote:
>
> > groupByKey does merge the values associated with the same key in
> different
> > partitions:
> >
> > scala> val rdd = sc.parallelize(List(1, 1, 1, 1),
> > 4).mapPartitionsWithIndex((idx, itr) => List(("foo", idx ->
> > math.random),("bar", idx -> math.random)).toIterator)
> >
> > scala> rdd.collect.foreach(println)
> >
> > (foo,(0,0.7387266457142971))
> > (bar,(0,0.06390701080780203))
> > (foo,(1,0.3601832111876926))
> > (bar,(1,0.5247725435958681))
> > (foo,(2,0.7486323021599729))
> > (bar,(2,0.9185837845634715))
> > (foo,(3,0.17591718413623136))
> > (bar,(3,0.12096331089133605))
> >
> > scala> rdd.groupByKey.collect.foreach(println)
> >
> > (foo,ArrayBuffer((0,0.8432285514154537), (1,0.3005967566708283),
> > (2,0.6150820518108783), (3,0.4779052219014124)))
> > (bar,ArrayBuffer((0,0.8190206253566251), (1,0.3465707665527258),
> > (2,0.5187789456090471), (3,0.9612998198743644)))
> >
> >
> > On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
> > <ar...@gmail.com>wrote:
> >
> > > Hi,
> > >
> > > Below is the implementation for GroupByKey. (v, 0.8.0)
> > >
> > >
> > > def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
> > >     def createCombiner(v: V) = ArrayBuffer(v)
> > >     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
> > >     val bufs = combineByKey[ArrayBuffer[V]](
> > >       createCombiner _, mergeValue _, null, partitioner,
> > > mapSideCombine=false)
> > >     bufs.asInstanceOf[RDD[(K, Seq[V])]]
> > >   }
> > >
> > > and CombineValuesByKey (Aggregator.scala):
> > >
> > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> > Iterator[(K,
> > > C)] = {
> > >     val combiners = new JHashMap[K, C]
> > >     for (kv <- iter) {
> > >       val oldC = combiners.get(kv._1)
> > >       if (oldC == null) {
> > >         combiners.put(kv._1, createCombiner(kv._2))
> > >       } else {
> > >         combiners.put(kv._1, mergeValue(oldC, kv._2))
> > >       }
> > >     }
> > >     combiners.iterator
> > >   }
> > >
> > > My doubt is why null is being passed for mergeCombiners closure.
> > >
> > > If two different partitions have same key, wouldn't there be the
> > > requirement to merge them afterwards?
> > >
> > > Thanks,
> > > Archit.
> > >
> >
>

Re: GroupByKey implementation.

Posted by Archit Thakur <ar...@gmail.com>.
Which spark version are you on?


On Mon, Jan 27, 2014 at 3:12 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> groupByKey does merge the values associated with the same key in different
> partitions:
>
> scala> val rdd = sc.parallelize(List(1, 1, 1, 1),
> 4).mapPartitionsWithIndex((idx, itr) => List(("foo", idx ->
> math.random),("bar", idx -> math.random)).toIterator)
>
> scala> rdd.collect.foreach(println)
>
> (foo,(0,0.7387266457142971))
> (bar,(0,0.06390701080780203))
> (foo,(1,0.3601832111876926))
> (bar,(1,0.5247725435958681))
> (foo,(2,0.7486323021599729))
> (bar,(2,0.9185837845634715))
> (foo,(3,0.17591718413623136))
> (bar,(3,0.12096331089133605))
>
> scala> rdd.groupByKey.collect.foreach(println)
>
> (foo,ArrayBuffer((0,0.8432285514154537), (1,0.3005967566708283),
> (2,0.6150820518108783), (3,0.4779052219014124)))
> (bar,ArrayBuffer((0,0.8190206253566251), (1,0.3465707665527258),
> (2,0.5187789456090471), (3,0.9612998198743644)))
>
>
> On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
> <ar...@gmail.com>wrote:
>
> > Hi,
> >
> > Below is the implementation for GroupByKey. (v, 0.8.0)
> >
> >
> > def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
> >     def createCombiner(v: V) = ArrayBuffer(v)
> >     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
> >     val bufs = combineByKey[ArrayBuffer[V]](
> >       createCombiner _, mergeValue _, null, partitioner,
> > mapSideCombine=false)
> >     bufs.asInstanceOf[RDD[(K, Seq[V])]]
> >   }
> >
> > and CombineValuesByKey (Aggregator.scala):
> >
> > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> Iterator[(K,
> > C)] = {
> >     val combiners = new JHashMap[K, C]
> >     for (kv <- iter) {
> >       val oldC = combiners.get(kv._1)
> >       if (oldC == null) {
> >         combiners.put(kv._1, createCombiner(kv._2))
> >       } else {
> >         combiners.put(kv._1, mergeValue(oldC, kv._2))
> >       }
> >     }
> >     combiners.iterator
> >   }
> >
> > My doubt is why null is being passed for mergeCombiners closure.
> >
> > If two different partitions have same key, wouldn't there be the
> > requirement to merge them afterwards?
> >
> > Thanks,
> > Archit.
> >
>

Re: GroupByKey implementation.

Posted by Mark Hamstra <ma...@clearstorydata.com>.
groupByKey does merge the values associated with the same key in different
partitions:

scala> val rdd = sc.parallelize(List(1, 1, 1, 1),
4).mapPartitionsWithIndex((idx, itr) => List(("foo", idx ->
math.random),("bar", idx -> math.random)).toIterator)

scala> rdd.collect.foreach(println)

(foo,(0,0.7387266457142971))
(bar,(0,0.06390701080780203))
(foo,(1,0.3601832111876926))
(bar,(1,0.5247725435958681))
(foo,(2,0.7486323021599729))
(bar,(2,0.9185837845634715))
(foo,(3,0.17591718413623136))
(bar,(3,0.12096331089133605))

scala> rdd.groupByKey.collect.foreach(println)

(foo,ArrayBuffer((0,0.8432285514154537), (1,0.3005967566708283),
(2,0.6150820518108783), (3,0.4779052219014124)))
(bar,ArrayBuffer((0,0.8190206253566251), (1,0.3465707665527258),
(2,0.5187789456090471), (3,0.9612998198743644)))


On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
<ar...@gmail.com>wrote:

> Hi,
>
> Below is the implementation for GroupByKey. (v, 0.8.0)
>
>
> def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
>     def createCombiner(v: V) = ArrayBuffer(v)
>     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
>     val bufs = combineByKey[ArrayBuffer[V]](
>       createCombiner _, mergeValue _, null, partitioner,
> mapSideCombine=false)
>     bufs.asInstanceOf[RDD[(K, Seq[V])]]
>   }
>
> and CombineValuesByKey (Aggregator.scala):
>
> def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
> C)] = {
>     val combiners = new JHashMap[K, C]
>     for (kv <- iter) {
>       val oldC = combiners.get(kv._1)
>       if (oldC == null) {
>         combiners.put(kv._1, createCombiner(kv._2))
>       } else {
>         combiners.put(kv._1, mergeValue(oldC, kv._2))
>       }
>     }
>     combiners.iterator
>   }
>
> My doubt is why null is being passed for mergeCombiners closure.
>
> If two different partitions have same key, wouldn't there be the
> requirement to merge them afterwards?
>
> Thanks,
> Archit.
>