You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Steve Lewis <lo...@gmail.com> on 2014/09/19 22:37:56 UTC

Reproducing the function of a Hadoop Reducer

 I am struggling to reproduce the functionality of a Hadoop reducer on
Spark (in Java)

in Hadoop I have a function
public void doReduce(K key, Iterator<V> values)
in Hadoop there is also a consumer (context write) which can be seen as
consume(key,value)

In my code
1) knowing the key is important to the function
2) there is neither one output tuple2 per key nor one output tuple2 per
value
3) the number of values per key might be large enough that storing them in
memory is impractical
4) keys must appear in sorted order

one good example would run through a large document using a similarity
function to look at the last 200 lines and output any of those with a
similarity of more than 0.3 (do not suggest output all and filter - the
real problem is more complex) the critical concern is an uncertain number
of tuples per key.

my questions
1) how can this be done - ideally a consumer would be a JavaPairRDD but I
don't see how to create one and add items later

2) how do I handle the entire partition, sort, process (involving calls to
doReduce) process

Re: Reproducing the function of a Hadoop Reducer

Posted by Victor Tso-Guillen <vt...@paxata.com>.
   1. Actually, I disagree that combineByKey requires that all values be
   held in memory for a key. Only the use case groupByKey does that, whereas
   reduceByKey, foldByKey, and the generic combineByKey do not necessarily
   make that requirement. If your combine logic really shrinks the result
   value by a lot, I think it would be worth it to make sure mapSideCombine is
   true.
   2. In order to get the key into the combine logic, you may need to
   project it into a (K, (K, V)). I'm not sure there's a method that otherwise
   provides the information you're asking for. Unfortunately, that is a lot
   more heavyweight.
   3. If you absolutely need the keys in sorted order before you combine,
   then perhaps you could sortByKey before doing your combineByKey, but you
   pay the price of a bigger shuffle doing it that way.

I hope that helps. If not, perhaps you can sketch out in more detail what
you're trying to accomplish and I or someone else can guide you through.

Cheers,
Y

On Sat, Sep 20, 2014 at 11:09 AM, Steve Lewis <lo...@gmail.com> wrote:

> OK so in Java - pardon the verbosity I might say something like the code
> below
> but I face the following issues
> 1) I need to store all values in memory as I run combineByKey - it I could
> return an RDD which consumed values that would be great but I don't know
> how to do that -
> 2) In my version of the functions I get a tuple so I know the key but all
> of Scala's functtions for byKey do not make the key available - this may
> work for a trivial function like wordcount but the code I want to port
> needs to know the key when processing values
> 3) it is important the I have control over partitioning - I can do that
> with mapPartition but it is also important that within a partition keys be
> received in sorted order - easy if every partition could  a separate RDD -
> combined later but I am not sure how that works.
>
> in Hadoop then I reduce the values for each key I get an interator and do
> not need to keep all values in memory. Similarly while the output in Hadoop
> is written to disk as key values in Spark it could populate a JavaPairRDD
> if there were a way to do that lazily
>
> One other issue - I don't see a good way to say a merge function is
> finished - i.e. no further data is coming in which would be useful in
> processing steps.
>
>
>
>     /**
>      * a class to store a key and all its values
>      *   using an array list
>      * @param <K> key type
>      * @param <V> value type
>      */
>     public static class KeyAndValues<K, V> {
>         public final K key;
>         private final ArrayList<V> values = new ArrayList<V>();
>         public KeyAndValues(final K pKey) {
>             key = pKey;
>         }
>          public void addValue(V added) {
>             values.add(added);
>         }
>          public Iterable<V> getIterable() {
>             return values;
>         }
>          public KeyAndValues<K, V> merge(KeyAndValues<K, V> merged) {
>             values.addAll(merged.values);
>             return this;
>         }
>     }
>
>      // start function for combine by key - gets key from first tuple
>        public static class CombineStartKeyAndValues<K, V> implements
> Function<Tuple2<K,V>, KeyAndValues<K, V>> {
>           public KeyAndValues call(Tuple2<K,V> x) {
>             KeyAndValues ret = new KeyAndValues(x._1());
>             ret.addValue(x._2());
>             return ret;
>         }
>     }
>
>     // continue function for combine by key -  adds values to array
>      public static class CombineContinueKeyAndValues<K, V> implements
> Function2< KeyAndValues< K,V>, Tuple2<K,V>, KeyAndValues<K, V>> {
>          public KeyAndValues<K, V> call(final KeyAndValues<K, V> kvs,
> final Tuple2<K,V> added) throws Exception {
>             kvs.addValue(added._2());
>             return kvs;
>         }
>     }
>
>       // merge function - merges arrays - NOTE there is no signal to say
> merge is done
>      public static class CombineMergeKeyAndValues<K, V> implements
> Function2< KeyAndValues<K, V>,KeyAndValues<K, V>,KeyAndValues<K, V>> {
>           public KeyAndValues<K, V> call(final KeyAndValues<K, V> v1,
> final KeyAndValues<K, V> v2) throws Exception {
>             return null;
>         }
>     }
>
> On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen <vt...@paxata.com>
> wrote:
>
>> So sorry about teasing you with the Scala. But the method is there in
>> Java too, I just checked.
>>
>> On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen <vt...@paxata.com>
>> wrote:
>>
>>> It might not be the same as a real hadoop reducer, but I think it would
>>> accomplish the same. Take a look at:
>>>
>>> import org.apache.spark.SparkContext._
>>> // val rdd: RDD[(K, V)]
>>> // def zero(value: V): S
>>> // def reduce(agg: S, value: V): S
>>> // def merge(agg1: S, agg2: S): S
>>> val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
>>> merge)
>>> reducedUnsorted.sortByKey()
>>>
>>> On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis <lo...@gmail.com>
>>> wrote:
>>>
>>>>  I am struggling to reproduce the functionality of a Hadoop reducer on
>>>> Spark (in Java)
>>>>
>>>> in Hadoop I have a function
>>>> public void doReduce(K key, Iterator<V> values)
>>>> in Hadoop there is also a consumer (context write) which can be seen as
>>>> consume(key,value)
>>>>
>>>> In my code
>>>> 1) knowing the key is important to the function
>>>> 2) there is neither one output tuple2 per key nor one output tuple2 per
>>>> value
>>>> 3) the number of values per key might be large enough that storing them
>>>> in memory is impractical
>>>> 4) keys must appear in sorted order
>>>>
>>>> one good example would run through a large document using a similarity
>>>> function to look at the last 200 lines and output any of those with a
>>>> similarity of more than 0.3 (do not suggest output all and filter - the
>>>> real problem is more complex) the critical concern is an uncertain number
>>>> of tuples per key.
>>>>
>>>> my questions
>>>> 1) how can this be done - ideally a consumer would be a JavaPairRDD but
>>>> I don't see how to create one and add items later
>>>>
>>>> 2) how do I handle the entire partition, sort, process (involving calls
>>>> to doReduce) process
>>>>
>>>>
>>>>
>>>
>>>
>>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>

Re: Reproducing the function of a Hadoop Reducer

Posted by Steve Lewis <lo...@gmail.com>.
OK so in Java - pardon the verbosity I might say something like the code
below
but I face the following issues
1) I need to store all values in memory as I run combineByKey - it I could
return an RDD which consumed values that would be great but I don't know
how to do that -
2) In my version of the functions I get a tuple so I know the key but all
of Scala's functtions for byKey do not make the key available - this may
work for a trivial function like wordcount but the code I want to port
needs to know the key when processing values
3) it is important the I have control over partitioning - I can do that
with mapPartition but it is also important that within a partition keys be
received in sorted order - easy if every partition could  a separate RDD -
combined later but I am not sure how that works.

in Hadoop then I reduce the values for each key I get an interator and do
not need to keep all values in memory. Similarly while the output in Hadoop
is written to disk as key values in Spark it could populate a JavaPairRDD
if there were a way to do that lazily

One other issue - I don't see a good way to say a merge function is
finished - i.e. no further data is coming in which would be useful in
processing steps.



    /**
     * a class to store a key and all its values
     *   using an array list
     * @param <K> key type
     * @param <V> value type
     */
    public static class KeyAndValues<K, V> {
        public final K key;
        private final ArrayList<V> values = new ArrayList<V>();
        public KeyAndValues(final K pKey) {
            key = pKey;
        }
         public void addValue(V added) {
            values.add(added);
        }
         public Iterable<V> getIterable() {
            return values;
        }
         public KeyAndValues<K, V> merge(KeyAndValues<K, V> merged) {
            values.addAll(merged.values);
            return this;
        }
    }

     // start function for combine by key - gets key from first tuple
       public static class CombineStartKeyAndValues<K, V> implements
Function<Tuple2<K,V>, KeyAndValues<K, V>> {
          public KeyAndValues call(Tuple2<K,V> x) {
            KeyAndValues ret = new KeyAndValues(x._1());
            ret.addValue(x._2());
            return ret;
        }
    }

    // continue function for combine by key -  adds values to array
     public static class CombineContinueKeyAndValues<K, V> implements
Function2< KeyAndValues< K,V>, Tuple2<K,V>, KeyAndValues<K, V>> {
         public KeyAndValues<K, V> call(final KeyAndValues<K, V> kvs, final
Tuple2<K,V> added) throws Exception {
            kvs.addValue(added._2());
            return kvs;
        }
    }

      // merge function - merges arrays - NOTE there is no signal to say
merge is done
     public static class CombineMergeKeyAndValues<K, V> implements
Function2< KeyAndValues<K, V>,KeyAndValues<K, V>,KeyAndValues<K, V>> {
          public KeyAndValues<K, V> call(final KeyAndValues<K, V> v1, final
KeyAndValues<K, V> v2) throws Exception {
            return null;
        }
    }

On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen <vt...@paxata.com>
wrote:

> So sorry about teasing you with the Scala. But the method is there in Java
> too, I just checked.
>
> On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen <vt...@paxata.com>
> wrote:
>
>> It might not be the same as a real hadoop reducer, but I think it would
>> accomplish the same. Take a look at:
>>
>> import org.apache.spark.SparkContext._
>> // val rdd: RDD[(K, V)]
>> // def zero(value: V): S
>> // def reduce(agg: S, value: V): S
>> // def merge(agg1: S, agg2: S): S
>> val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
>> merge)
>> reducedUnsorted.sortByKey()
>>
>> On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis <lo...@gmail.com>
>> wrote:
>>
>>>  I am struggling to reproduce the functionality of a Hadoop reducer on
>>> Spark (in Java)
>>>
>>> in Hadoop I have a function
>>> public void doReduce(K key, Iterator<V> values)
>>> in Hadoop there is also a consumer (context write) which can be seen as
>>> consume(key,value)
>>>
>>> In my code
>>> 1) knowing the key is important to the function
>>> 2) there is neither one output tuple2 per key nor one output tuple2 per
>>> value
>>> 3) the number of values per key might be large enough that storing them
>>> in memory is impractical
>>> 4) keys must appear in sorted order
>>>
>>> one good example would run through a large document using a similarity
>>> function to look at the last 200 lines and output any of those with a
>>> similarity of more than 0.3 (do not suggest output all and filter - the
>>> real problem is more complex) the critical concern is an uncertain number
>>> of tuples per key.
>>>
>>> my questions
>>> 1) how can this be done - ideally a consumer would be a JavaPairRDD but
>>> I don't see how to create one and add items later
>>>
>>> 2) how do I handle the entire partition, sort, process (involving calls
>>> to doReduce) process
>>>
>>>
>>>
>>
>>
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Re: Reproducing the function of a Hadoop Reducer

Posted by Victor Tso-Guillen <vt...@paxata.com>.
So sorry about teasing you with the Scala. But the method is there in Java
too, I just checked.

On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen <vt...@paxata.com> wrote:

> It might not be the same as a real hadoop reducer, but I think it would
> accomplish the same. Take a look at:
>
> import org.apache.spark.SparkContext._
> // val rdd: RDD[(K, V)]
> // def zero(value: V): S
> // def reduce(agg: S, value: V): S
> // def merge(agg1: S, agg2: S): S
> val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
> merge)
> reducedUnsorted.sortByKey()
>
> On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis <lo...@gmail.com>
> wrote:
>
>>  I am struggling to reproduce the functionality of a Hadoop reducer on
>> Spark (in Java)
>>
>> in Hadoop I have a function
>> public void doReduce(K key, Iterator<V> values)
>> in Hadoop there is also a consumer (context write) which can be seen as
>> consume(key,value)
>>
>> In my code
>> 1) knowing the key is important to the function
>> 2) there is neither one output tuple2 per key nor one output tuple2 per
>> value
>> 3) the number of values per key might be large enough that storing them
>> in memory is impractical
>> 4) keys must appear in sorted order
>>
>> one good example would run through a large document using a similarity
>> function to look at the last 200 lines and output any of those with a
>> similarity of more than 0.3 (do not suggest output all and filter - the
>> real problem is more complex) the critical concern is an uncertain number
>> of tuples per key.
>>
>> my questions
>> 1) how can this be done - ideally a consumer would be a JavaPairRDD but I
>> don't see how to create one and add items later
>>
>> 2) how do I handle the entire partition, sort, process (involving calls
>> to doReduce) process
>>
>>
>>
>
>

Re: Reproducing the function of a Hadoop Reducer

Posted by Victor Tso-Guillen <vt...@paxata.com>.
It might not be the same as a real hadoop reducer, but I think it would
accomplish the same. Take a look at:

import org.apache.spark.SparkContext._
// val rdd: RDD[(K, V)]
// def zero(value: V): S
// def reduce(agg: S, value: V): S
// def merge(agg1: S, agg2: S): S
val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
merge)
reducedUnsorted.sortByKey()

On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis <lo...@gmail.com> wrote:

>  I am struggling to reproduce the functionality of a Hadoop reducer on
> Spark (in Java)
>
> in Hadoop I have a function
> public void doReduce(K key, Iterator<V> values)
> in Hadoop there is also a consumer (context write) which can be seen as
> consume(key,value)
>
> In my code
> 1) knowing the key is important to the function
> 2) there is neither one output tuple2 per key nor one output tuple2 per
> value
> 3) the number of values per key might be large enough that storing them in
> memory is impractical
> 4) keys must appear in sorted order
>
> one good example would run through a large document using a similarity
> function to look at the last 200 lines and output any of those with a
> similarity of more than 0.3 (do not suggest output all and filter - the
> real problem is more complex) the critical concern is an uncertain number
> of tuples per key.
>
> my questions
> 1) how can this be done - ideally a consumer would be a JavaPairRDD but I
> don't see how to create one and add items later
>
> 2) how do I handle the entire partition, sort, process (involving calls to
> doReduce) process
>
>
>