You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@crunch.apache.org by Gabriel Reid <ga...@gmail.com> on 2012/06/28 23:11:01 UTC

Object reuse in Reducer and impact in Crunch

Hi guys, 

As you may have seen, the topic of the PTable#collectValues method came up today in the user mailing list. I hadn't been aware of this method before, and when I took a closer look I saw that it just creates a Collection of values based on the incoming Iterable, without doing any kind of a deep copy of the contents of the Iterable. As far as I can see, something similar (i.e. holding on to values from an Iterable from a reducer) is also done in the Join methods.

As Christian also pointed out (and added to the documentation for DoFn), this can be an issue, as values made available as an Iterable in a reducer are re-used within Hadoop.

This object re-use isn't a problem in Crunch wherever a non-identity mapping is used between the serialization type and the PCollection type within the PType (for example, with primitives and String). However, using Writable types or non-mapped Avro types won't work (as shown in the attached test case).

I think it's definitely a problem that PTable#collectValues (and probably some other methods) doesn't work for Writables, or in a broader sense, that the semantics can change for the Iterable that is passed in when processing a grouped table.

One really easy (but also inefficient) way we could solve this would be to not use an IdentityFn as the default mapping function in Writables and AvroType, and instead use a MapFn that does a deep copy of the object (i.e. by serializing and deserializing itself in memory). This is of course a pretty big overhead for a something that isn't necessary in a lot of cases.

Another option I was considering was to do something like making the input and output PTypes of a DoFn available to the DoFn, and adding a createDetachedValue method (or something similar) to PType, which would then serialize and deserialize objects in order to make a clone if necessary. With this approach, the clone method would have to be called within the collectValues method (or any other method that is holding on to values outside of the iterator).

I prefer the second approach, as it avoids the the waste of extra cloning/serialization while still making it possible to get detached values out of an Iterable. 

Does anyone else have any thoughts on this?

- Gabriel 


Re: Object reuse in Reducer and impact in Crunch

Posted by Josh Wills <jo...@gmail.com>.
+1. Thanks.
On Jun 29, 2012 2:48 AM, "Gabriel Reid" <ga...@gmail.com> wrote:

> Yep, that's an interesting one as well. I'll fix the value re-use
> issue with the change to PType if that sounds ok to you (or anyone
> else), and I'll take a look at what can be done about this as well if
> possible.
>
> - Gabriel
>
> On Thu, Jun 28, 2012 at 11:41 PM, Josh Wills <jo...@gmail.com> wrote:
> > Yeah, that's no good. I had a similar case w/a project my intern was
> > working on, where he created a PType that was:
> >
> > PType<Pair<K, V>> kv = pairs(strings(), strings());
> > ...
> > tableOf(kv, kv);
> >
> > ... which also fails because the output mapfn returned by kv is
> > stateful. Easy to remedy it in the code, but confusing for the user in
> > the same way. It would be nice for the PType to know if their MapFns
> > were stateful so that a new instance of them could be returned each
> > time PType.getInputMapFn or getOutputMapFn was called.
> >
> > On Thu, Jun 28, 2012 at 2:32 PM, Gabriel Reid <ga...@gmail.com>
> wrote:
> >> Hmm, strange...according to my mail client the attachment was in
> >> there. Anyhow, I've pasted it inline below:
> >>
> >> package com.cloudera.crunch;
> >>
> >> import static org.junit.Assert.assertEquals;
> >>
> >> import java.io.DataInput;
> >> import java.io.DataOutput;
> >> import java.io.IOException;
> >> import java.io.Serializable;
> >> import java.util.Collection;
> >> import java.util.Collections;
> >> import java.util.List;
> >> import java.util.Map;
> >>
> >> import org.apache.hadoop.io.Writable;
> >> import org.junit.Test;
> >>
> >> import com.cloudera.crunch.impl.mr.MRPipeline;
> >> import com.cloudera.crunch.test.FileHelper;
> >> import com.cloudera.crunch.types.writable.Writables;
> >> import com.google.common.collect.Lists;
> >>
> >> public class SerializationReducerTest implements Serializable {
> >>
> >>  public static class SimpleStringWritable implements Writable {
> >>
> >>    private String value;
> >>
> >>    public void setValue(String value) {
> >>      this.value = value;
> >>    }
> >>
> >>    public String getValue() {
> >>      return value;
> >>    }
> >>
> >>    @Override
> >>    public String toString() {
> >>      return String.format("SimpleStringWritable(%s)", value);
> >>    }
> >>
> >>    @Override
> >>    public void write(DataOutput out) throws IOException {
> >>      out.writeUTF(value);
> >>    }
> >>
> >>    @Override
> >>    public void readFields(DataInput in) throws IOException {
> >>      this.value = in.readUTF();
> >>    }
> >>
> >>  }
> >>
> >>  static SimpleStringWritable asSimple(String value) {
> >>    SimpleStringWritable simpleStringWritable = new
> SimpleStringWritable();
> >>    simpleStringWritable.setValue(value);
> >>    return simpleStringWritable;
> >>  }
> >>
> >>  static List<String> simplesToList(Collection<SimpleStringWritable>
> >> simpleCollection) {
> >>    List<String> stringList = Lists.newArrayList();
> >>    for (SimpleStringWritable writable : simpleCollection) {
> >>      stringList.add(writable.getValue());
> >>    }
> >>    Collections.sort(stringList);
> >>    return stringList;
> >>  }
> >>
> >>  @Test
> >>  public void testWritables() throws IOException {
> >>    Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
> >>    Map<Integer, Collection<SimpleStringWritable>> collectionMap =
> pipeline
> >>        .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
> >>        .parallelDo(new MapFn<String, Pair<Integer,
> SimpleStringWritable>>() {
> >>
> >>          @Override
> >>          public Pair<Integer, SimpleStringWritable> map(String input) {
> >>            return Pair.of(1, asSimple(input));
> >>          }
> >>
> >>        }, Writables.tableOf(Writables.ints(),
> >> Writables.writables(SimpleStringWritable.class))
> >>
> >>        ).collectValues().materializeToMap();
> >>
> >>    assertEquals(1, collectionMap.size());
> >>
> >>    // The actual content will just be ["e", "e", "e", "e"]
> >>    assertEquals(Lists.newArrayList("a", "b", "c", "e"),
> >> simplesToList(collectionMap.get(1)));
> >>  }
> >>
> >>
> >> }
> >>
> >>
> >> On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <jw...@cloudera.com>
> wrote:
> >>> Gabriel,
> >>>
> >>> Generally agree with your line of thought-- where is the attached test
> case?
> >>>
> >>> J
> >>>
> >>> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <ga...@gmail.com>
> wrote:
> >>>> Hi guys,
> >>>>
> >>>> As you may have seen, the topic of the PTable#collectValues method
> came up today in the user mailing list. I hadn't been aware of this method
> before, and when I took a closer look I saw that it just creates a
> Collection of values based on the incoming Iterable, without doing any kind
> of a deep copy of the contents of the Iterable. As far as I can see,
> something similar (i.e. holding on to values from an Iterable from a
> reducer) is also done in the Join methods.
> >>>>
> >>>> As Christian also pointed out (and added to the documentation for
> DoFn), this can be an issue, as values made available as an Iterable in a
> reducer are re-used within Hadoop.
> >>>>
> >>>> This object re-use isn't a problem in Crunch wherever a non-identity
> mapping is used between the serialization type and the PCollection type
> within the PType (for example, with primitives and String). However, using
> Writable types or non-mapped Avro types won't work (as shown in the
> attached test case).
> >>>>
> >>>> I think it's definitely a problem that PTable#collectValues (and
> probably some other methods) doesn't work for Writables, or in a broader
> sense, that the semantics can change for the Iterable that is passed in
> when processing a grouped table.
> >>>>
> >>>> One really easy (but also inefficient) way we could solve this would
> be to not use an IdentityFn as the default mapping function in Writables
> and AvroType, and instead use a MapFn that does a deep copy of the object
> (i.e. by serializing and deserializing itself in memory). This is of course
> a pretty big overhead for a something that isn't necessary in a lot of
> cases.
> >>>>
> >>>> Another option I was considering was to do something like making the
> input and output PTypes of a DoFn available to the DoFn, and adding a
> createDetachedValue method (or something similar) to PType, which would
> then serialize and deserialize objects in order to make a clone if
> necessary. With this approach, the clone method would have to be called
> within the collectValues method (or any other method that is holding on to
> values outside of the iterator).
> >>>>
> >>>> I prefer the second approach, as it avoids the the waste of extra
> cloning/serialization while still making it possible to get detached values
> out of an Iterable.
> >>>>
> >>>> Does anyone else have any thoughts on this?
> >>>>
> >>>> - Gabriel
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> Director of Data Science
> >>> Cloudera
> >>> Twitter: @josh_wills
>

Re: Object reuse in Reducer and impact in Crunch

Posted by Gabriel Reid <ga...@gmail.com>.
Yep, that's an interesting one as well. I'll fix the value re-use
issue with the change to PType if that sounds ok to you (or anyone
else), and I'll take a look at what can be done about this as well if
possible.

- Gabriel

On Thu, Jun 28, 2012 at 11:41 PM, Josh Wills <jo...@gmail.com> wrote:
> Yeah, that's no good. I had a similar case w/a project my intern was
> working on, where he created a PType that was:
>
> PType<Pair<K, V>> kv = pairs(strings(), strings());
> ...
> tableOf(kv, kv);
>
> ... which also fails because the output mapfn returned by kv is
> stateful. Easy to remedy it in the code, but confusing for the user in
> the same way. It would be nice for the PType to know if their MapFns
> were stateful so that a new instance of them could be returned each
> time PType.getInputMapFn or getOutputMapFn was called.
>
> On Thu, Jun 28, 2012 at 2:32 PM, Gabriel Reid <ga...@gmail.com> wrote:
>> Hmm, strange...according to my mail client the attachment was in
>> there. Anyhow, I've pasted it inline below:
>>
>> package com.cloudera.crunch;
>>
>> import static org.junit.Assert.assertEquals;
>>
>> import java.io.DataInput;
>> import java.io.DataOutput;
>> import java.io.IOException;
>> import java.io.Serializable;
>> import java.util.Collection;
>> import java.util.Collections;
>> import java.util.List;
>> import java.util.Map;
>>
>> import org.apache.hadoop.io.Writable;
>> import org.junit.Test;
>>
>> import com.cloudera.crunch.impl.mr.MRPipeline;
>> import com.cloudera.crunch.test.FileHelper;
>> import com.cloudera.crunch.types.writable.Writables;
>> import com.google.common.collect.Lists;
>>
>> public class SerializationReducerTest implements Serializable {
>>
>>  public static class SimpleStringWritable implements Writable {
>>
>>    private String value;
>>
>>    public void setValue(String value) {
>>      this.value = value;
>>    }
>>
>>    public String getValue() {
>>      return value;
>>    }
>>
>>    @Override
>>    public String toString() {
>>      return String.format("SimpleStringWritable(%s)", value);
>>    }
>>
>>    @Override
>>    public void write(DataOutput out) throws IOException {
>>      out.writeUTF(value);
>>    }
>>
>>    @Override
>>    public void readFields(DataInput in) throws IOException {
>>      this.value = in.readUTF();
>>    }
>>
>>  }
>>
>>  static SimpleStringWritable asSimple(String value) {
>>    SimpleStringWritable simpleStringWritable = new SimpleStringWritable();
>>    simpleStringWritable.setValue(value);
>>    return simpleStringWritable;
>>  }
>>
>>  static List<String> simplesToList(Collection<SimpleStringWritable>
>> simpleCollection) {
>>    List<String> stringList = Lists.newArrayList();
>>    for (SimpleStringWritable writable : simpleCollection) {
>>      stringList.add(writable.getValue());
>>    }
>>    Collections.sort(stringList);
>>    return stringList;
>>  }
>>
>>  @Test
>>  public void testWritables() throws IOException {
>>    Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
>>    Map<Integer, Collection<SimpleStringWritable>> collectionMap = pipeline
>>        .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
>>        .parallelDo(new MapFn<String, Pair<Integer, SimpleStringWritable>>() {
>>
>>          @Override
>>          public Pair<Integer, SimpleStringWritable> map(String input) {
>>            return Pair.of(1, asSimple(input));
>>          }
>>
>>        }, Writables.tableOf(Writables.ints(),
>> Writables.writables(SimpleStringWritable.class))
>>
>>        ).collectValues().materializeToMap();
>>
>>    assertEquals(1, collectionMap.size());
>>
>>    // The actual content will just be ["e", "e", "e", "e"]
>>    assertEquals(Lists.newArrayList("a", "b", "c", "e"),
>> simplesToList(collectionMap.get(1)));
>>  }
>>
>>
>> }
>>
>>
>> On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <jw...@cloudera.com> wrote:
>>> Gabriel,
>>>
>>> Generally agree with your line of thought-- where is the attached test case?
>>>
>>> J
>>>
>>> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <ga...@gmail.com> wrote:
>>>> Hi guys,
>>>>
>>>> As you may have seen, the topic of the PTable#collectValues method came up today in the user mailing list. I hadn't been aware of this method before, and when I took a closer look I saw that it just creates a Collection of values based on the incoming Iterable, without doing any kind of a deep copy of the contents of the Iterable. As far as I can see, something similar (i.e. holding on to values from an Iterable from a reducer) is also done in the Join methods.
>>>>
>>>> As Christian also pointed out (and added to the documentation for DoFn), this can be an issue, as values made available as an Iterable in a reducer are re-used within Hadoop.
>>>>
>>>> This object re-use isn't a problem in Crunch wherever a non-identity mapping is used between the serialization type and the PCollection type within the PType (for example, with primitives and String). However, using Writable types or non-mapped Avro types won't work (as shown in the attached test case).
>>>>
>>>> I think it's definitely a problem that PTable#collectValues (and probably some other methods) doesn't work for Writables, or in a broader sense, that the semantics can change for the Iterable that is passed in when processing a grouped table.
>>>>
>>>> One really easy (but also inefficient) way we could solve this would be to not use an IdentityFn as the default mapping function in Writables and AvroType, and instead use a MapFn that does a deep copy of the object (i.e. by serializing and deserializing itself in memory). This is of course a pretty big overhead for a something that isn't necessary in a lot of cases.
>>>>
>>>> Another option I was considering was to do something like making the input and output PTypes of a DoFn available to the DoFn, and adding a createDetachedValue method (or something similar) to PType, which would then serialize and deserialize objects in order to make a clone if necessary. With this approach, the clone method would have to be called within the collectValues method (or any other method that is holding on to values outside of the iterator).
>>>>
>>>> I prefer the second approach, as it avoids the the waste of extra cloning/serialization while still making it possible to get detached values out of an Iterable.
>>>>
>>>> Does anyone else have any thoughts on this?
>>>>
>>>> - Gabriel
>>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera
>>> Twitter: @josh_wills

Re: Object reuse in Reducer and impact in Crunch

Posted by Josh Wills <jo...@gmail.com>.
Yeah, that's no good. I had a similar case w/a project my intern was
working on, where he created a PType that was:

PType<Pair<K, V>> kv = pairs(strings(), strings());
...
tableOf(kv, kv);

... which also fails because the output mapfn returned by kv is
stateful. Easy to remedy it in the code, but confusing for the user in
the same way. It would be nice for the PType to know if their MapFns
were stateful so that a new instance of them could be returned each
time PType.getInputMapFn or getOutputMapFn was called.

On Thu, Jun 28, 2012 at 2:32 PM, Gabriel Reid <ga...@gmail.com> wrote:
> Hmm, strange...according to my mail client the attachment was in
> there. Anyhow, I've pasted it inline below:
>
> package com.cloudera.crunch;
>
> import static org.junit.Assert.assertEquals;
>
> import java.io.DataInput;
> import java.io.DataOutput;
> import java.io.IOException;
> import java.io.Serializable;
> import java.util.Collection;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
>
> import org.apache.hadoop.io.Writable;
> import org.junit.Test;
>
> import com.cloudera.crunch.impl.mr.MRPipeline;
> import com.cloudera.crunch.test.FileHelper;
> import com.cloudera.crunch.types.writable.Writables;
> import com.google.common.collect.Lists;
>
> public class SerializationReducerTest implements Serializable {
>
>  public static class SimpleStringWritable implements Writable {
>
>    private String value;
>
>    public void setValue(String value) {
>      this.value = value;
>    }
>
>    public String getValue() {
>      return value;
>    }
>
>    @Override
>    public String toString() {
>      return String.format("SimpleStringWritable(%s)", value);
>    }
>
>    @Override
>    public void write(DataOutput out) throws IOException {
>      out.writeUTF(value);
>    }
>
>    @Override
>    public void readFields(DataInput in) throws IOException {
>      this.value = in.readUTF();
>    }
>
>  }
>
>  static SimpleStringWritable asSimple(String value) {
>    SimpleStringWritable simpleStringWritable = new SimpleStringWritable();
>    simpleStringWritable.setValue(value);
>    return simpleStringWritable;
>  }
>
>  static List<String> simplesToList(Collection<SimpleStringWritable>
> simpleCollection) {
>    List<String> stringList = Lists.newArrayList();
>    for (SimpleStringWritable writable : simpleCollection) {
>      stringList.add(writable.getValue());
>    }
>    Collections.sort(stringList);
>    return stringList;
>  }
>
>  @Test
>  public void testWritables() throws IOException {
>    Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
>    Map<Integer, Collection<SimpleStringWritable>> collectionMap = pipeline
>        .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
>        .parallelDo(new MapFn<String, Pair<Integer, SimpleStringWritable>>() {
>
>          @Override
>          public Pair<Integer, SimpleStringWritable> map(String input) {
>            return Pair.of(1, asSimple(input));
>          }
>
>        }, Writables.tableOf(Writables.ints(),
> Writables.writables(SimpleStringWritable.class))
>
>        ).collectValues().materializeToMap();
>
>    assertEquals(1, collectionMap.size());
>
>    // The actual content will just be ["e", "e", "e", "e"]
>    assertEquals(Lists.newArrayList("a", "b", "c", "e"),
> simplesToList(collectionMap.get(1)));
>  }
>
>
> }
>
>
> On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <jw...@cloudera.com> wrote:
>> Gabriel,
>>
>> Generally agree with your line of thought-- where is the attached test case?
>>
>> J
>>
>> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <ga...@gmail.com> wrote:
>>> Hi guys,
>>>
>>> As you may have seen, the topic of the PTable#collectValues method came up today in the user mailing list. I hadn't been aware of this method before, and when I took a closer look I saw that it just creates a Collection of values based on the incoming Iterable, without doing any kind of a deep copy of the contents of the Iterable. As far as I can see, something similar (i.e. holding on to values from an Iterable from a reducer) is also done in the Join methods.
>>>
>>> As Christian also pointed out (and added to the documentation for DoFn), this can be an issue, as values made available as an Iterable in a reducer are re-used within Hadoop.
>>>
>>> This object re-use isn't a problem in Crunch wherever a non-identity mapping is used between the serialization type and the PCollection type within the PType (for example, with primitives and String). However, using Writable types or non-mapped Avro types won't work (as shown in the attached test case).
>>>
>>> I think it's definitely a problem that PTable#collectValues (and probably some other methods) doesn't work for Writables, or in a broader sense, that the semantics can change for the Iterable that is passed in when processing a grouped table.
>>>
>>> One really easy (but also inefficient) way we could solve this would be to not use an IdentityFn as the default mapping function in Writables and AvroType, and instead use a MapFn that does a deep copy of the object (i.e. by serializing and deserializing itself in memory). This is of course a pretty big overhead for a something that isn't necessary in a lot of cases.
>>>
>>> Another option I was considering was to do something like making the input and output PTypes of a DoFn available to the DoFn, and adding a createDetachedValue method (or something similar) to PType, which would then serialize and deserialize objects in order to make a clone if necessary. With this approach, the clone method would have to be called within the collectValues method (or any other method that is holding on to values outside of the iterator).
>>>
>>> I prefer the second approach, as it avoids the the waste of extra cloning/serialization while still making it possible to get detached values out of an Iterable.
>>>
>>> Does anyone else have any thoughts on this?
>>>
>>> - Gabriel
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera
>> Twitter: @josh_wills

Re: Object reuse in Reducer and impact in Crunch

Posted by Gabriel Reid <ga...@gmail.com>.
Hmm, strange...according to my mail client the attachment was in
there. Anyhow, I've pasted it inline below:

package com.cloudera.crunch;

import static org.junit.Assert.assertEquals;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.Writable;
import org.junit.Test;

import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.writable.Writables;
import com.google.common.collect.Lists;

public class SerializationReducerTest implements Serializable {

  public static class SimpleStringWritable implements Writable {

    private String value;

    public void setValue(String value) {
      this.value = value;
    }

    public String getValue() {
      return value;
    }

    @Override
    public String toString() {
      return String.format("SimpleStringWritable(%s)", value);
    }

    @Override
    public void write(DataOutput out) throws IOException {
      out.writeUTF(value);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      this.value = in.readUTF();
    }

  }

  static SimpleStringWritable asSimple(String value) {
    SimpleStringWritable simpleStringWritable = new SimpleStringWritable();
    simpleStringWritable.setValue(value);
    return simpleStringWritable;
  }

  static List<String> simplesToList(Collection<SimpleStringWritable>
simpleCollection) {
    List<String> stringList = Lists.newArrayList();
    for (SimpleStringWritable writable : simpleCollection) {
      stringList.add(writable.getValue());
    }
    Collections.sort(stringList);
    return stringList;
  }

  @Test
  public void testWritables() throws IOException {
    Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
    Map<Integer, Collection<SimpleStringWritable>> collectionMap = pipeline
        .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
        .parallelDo(new MapFn<String, Pair<Integer, SimpleStringWritable>>() {

          @Override
          public Pair<Integer, SimpleStringWritable> map(String input) {
            return Pair.of(1, asSimple(input));
          }

        }, Writables.tableOf(Writables.ints(),
Writables.writables(SimpleStringWritable.class))

        ).collectValues().materializeToMap();

    assertEquals(1, collectionMap.size());

    // The actual content will just be ["e", "e", "e", "e"]
    assertEquals(Lists.newArrayList("a", "b", "c", "e"),
simplesToList(collectionMap.get(1)));
  }


}


On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <jw...@cloudera.com> wrote:
> Gabriel,
>
> Generally agree with your line of thought-- where is the attached test case?
>
> J
>
> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <ga...@gmail.com> wrote:
>> Hi guys,
>>
>> As you may have seen, the topic of the PTable#collectValues method came up today in the user mailing list. I hadn't been aware of this method before, and when I took a closer look I saw that it just creates a Collection of values based on the incoming Iterable, without doing any kind of a deep copy of the contents of the Iterable. As far as I can see, something similar (i.e. holding on to values from an Iterable from a reducer) is also done in the Join methods.
>>
>> As Christian also pointed out (and added to the documentation for DoFn), this can be an issue, as values made available as an Iterable in a reducer are re-used within Hadoop.
>>
>> This object re-use isn't a problem in Crunch wherever a non-identity mapping is used between the serialization type and the PCollection type within the PType (for example, with primitives and String). However, using Writable types or non-mapped Avro types won't work (as shown in the attached test case).
>>
>> I think it's definitely a problem that PTable#collectValues (and probably some other methods) doesn't work for Writables, or in a broader sense, that the semantics can change for the Iterable that is passed in when processing a grouped table.
>>
>> One really easy (but also inefficient) way we could solve this would be to not use an IdentityFn as the default mapping function in Writables and AvroType, and instead use a MapFn that does a deep copy of the object (i.e. by serializing and deserializing itself in memory). This is of course a pretty big overhead for a something that isn't necessary in a lot of cases.
>>
>> Another option I was considering was to do something like making the input and output PTypes of a DoFn available to the DoFn, and adding a createDetachedValue method (or something similar) to PType, which would then serialize and deserialize objects in order to make a clone if necessary. With this approach, the clone method would have to be called within the collectValues method (or any other method that is holding on to values outside of the iterator).
>>
>> I prefer the second approach, as it avoids the the waste of extra cloning/serialization while still making it possible to get detached values out of an Iterable.
>>
>> Does anyone else have any thoughts on this?
>>
>> - Gabriel
>>
>
>
>
> --
> Director of Data Science
> Cloudera
> Twitter: @josh_wills

Re: Object reuse in Reducer and impact in Crunch

Posted by Josh Wills <jw...@cloudera.com>.
Gabriel,

Generally agree with your line of thought-- where is the attached test case?

J

On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <ga...@gmail.com> wrote:
> Hi guys,
>
> As you may have seen, the topic of the PTable#collectValues method came up today in the user mailing list. I hadn't been aware of this method before, and when I took a closer look I saw that it just creates a Collection of values based on the incoming Iterable, without doing any kind of a deep copy of the contents of the Iterable. As far as I can see, something similar (i.e. holding on to values from an Iterable from a reducer) is also done in the Join methods.
>
> As Christian also pointed out (and added to the documentation for DoFn), this can be an issue, as values made available as an Iterable in a reducer are re-used within Hadoop.
>
> This object re-use isn't a problem in Crunch wherever a non-identity mapping is used between the serialization type and the PCollection type within the PType (for example, with primitives and String). However, using Writable types or non-mapped Avro types won't work (as shown in the attached test case).
>
> I think it's definitely a problem that PTable#collectValues (and probably some other methods) doesn't work for Writables, or in a broader sense, that the semantics can change for the Iterable that is passed in when processing a grouped table.
>
> One really easy (but also inefficient) way we could solve this would be to not use an IdentityFn as the default mapping function in Writables and AvroType, and instead use a MapFn that does a deep copy of the object (i.e. by serializing and deserializing itself in memory). This is of course a pretty big overhead for a something that isn't necessary in a lot of cases.
>
> Another option I was considering was to do something like making the input and output PTypes of a DoFn available to the DoFn, and adding a createDetachedValue method (or something similar) to PType, which would then serialize and deserialize objects in order to make a clone if necessary. With this approach, the clone method would have to be called within the collectValues method (or any other method that is holding on to values outside of the iterator).
>
> I prefer the second approach, as it avoids the the waste of extra cloning/serialization while still making it possible to get detached values out of an Iterable.
>
> Does anyone else have any thoughts on this?
>
> - Gabriel
>



-- 
Director of Data Science
Cloudera
Twitter: @josh_wills