You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Everett Anderson <ev...@nuna.com> on 2016/07/05 22:39:52 UTC

Hadoop InputFormat/RecordReducer and Writable reuse

Hey,

I recently implemented a Hadoop InputFormat that returns the raw bytes of
each record as a BytesWritable rather than as Text (as in TextInputFormat,
which assumes that the input is UTF-8).

One thing I noticed is that Hadoop RecordReader
<https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html>
implementations generally
<https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178>
re-use
<https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118>
the
<https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214>
Writable instance across multiple {getCurrentKey() + getCurrentValue()}
calls for efficiency, though this isn't documented.

Crunch handles this for Text because Writables.strings() uses this
converter:

  private static final MapFn<Text, String> TEXT_TO_STRING = new MapFn<Text,
String>() {
    @Override
    public String map(Text input) {
      return input.toString();
    }
  };

and toString() will create a copy of Text's data.

However, here is its corresponding map implementation for Writables.bytes():

  private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new
MapFn<BytesWritable, ByteBuffer>() {
    @Override
    public ByteBuffer map(BytesWritable input) {
      return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
    }
  };

since ByteBuffer.wrap() will still reference BytesWritable()'s internal
state, and the BytesWritable instance is reused across multiple records,
this causes problems in Crunch if the BytesWritable came from a
RecordReader.

One work-around is to construct a new WritableType that uses a MapFn that
creates a copy of the data, and only use it when reading from a Hadoop
InputFormat that returns a BytesWritable.

Is there a more general way to solve this?

Re: Hadoop InputFormat/RecordReducer and Writable reuse

Posted by Josh Wills <jo...@gmail.com>.
That is correct, yes.

On Wed, Jul 6, 2016 at 10:07 AM, Everett Anderson <ev...@nuna.com> wrote:

>
>
> On Tue, Jul 5, 2016 at 10:06 PM, Josh Wills <jo...@gmail.com> wrote:
>
>> That _may_ cause problems in Crunch if the DoFns that are processing
>> those ByteBuffers don't convert them to an immutable data type, or if they
>> need to cache some of those values along the way during processing (if
>> there isn't any caching in the flow, it's not normally an issue to process
>> the records one-at-a-time w/no deep copy necessary.) The best practice in
>> Crunch for those situations (i.e., non-immutable data + some sort of
>> caching/maintenance of state) is to use the PType.getDetachedValue(obj)
>> function to do a deep copy of the value in a way that is independent of the
>> underlying data type (where you can be clever for PTypes of immutable
>> objects and just return the value itself.)
>>
>
> If I understand correctly, for non-immutable types, the recommendation is
> to be aware and add either a DoFn that copies the data right after read()
> or use a PType that does this. Is that right?
>
> For example (though if you were using Text, you should probably just use
> Writables.strings() to avoid this), this won't work:
>
>     TableSource<LongWritable, Text> source =
>         From.formattedFile("/path/to/textfile",
>                            TextInputFormat.class,
>                            Writables.writables(LongWritable.class),
>                            Writables.writables(Text.class));
>
>     PTable<LongWritable, Text> data = getPipeline().read(source);
>
>     for (Pair<LongWritable, Text> pair : data.materialize()) {
>       System.out.println("offset: " + pair.first() +
>                          ", text: " + pair.second().toString());
>     }
>
> but you could add something like this right after read() to force a copy
> (which will work because Crunch would do this before a serialization
> boundary) --
>
>     final PType<Pair<LongWritable, Text>> pType = data.getPType();
>     pType.initialize(getPipeline().getConfiguration());
>
>     data = data.parallelDo(
>         new DoFn<Pair<LongWritable, Text>, Pair<LongWritable, Text>>() {
>           public void process(Pair<LongWritable, Text> input,
>                               Emitter<Pair<LongWritable, Text>> emitter) {
>             emitter.emit(pType.getDetachedValue(input));
>           }
>         }, data.getPTableType());
>
>  or you could create your own PTypes to give to From.formattedFile that
> make copies.
>
>
>> On Tue, Jul 5, 2016 at 3:39 PM, Everett Anderson <ev...@nuna.com>
>> wrote:
>>
>>> Hey,
>>>
>>> I recently implemented a Hadoop InputFormat that returns the raw bytes
>>> of each record as a BytesWritable rather than as Text (as in
>>> TextInputFormat, which assumes that the input is UTF-8).
>>>
>>> One thing I noticed is that Hadoop RecordReader
>>> <https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html>
>>> implementations generally
>>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178>
>>> re-use
>>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118>
>>> the
>>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214>
>>> Writable instance across multiple {getCurrentKey() + getCurrentValue()}
>>> calls for efficiency, though this isn't documented.
>>>
>>> Crunch handles this for Text because Writables.strings() uses this
>>> converter:
>>>
>>>   private static final MapFn<Text, String> TEXT_TO_STRING = new
>>> MapFn<Text, String>() {
>>>     @Override
>>>     public String map(Text input) {
>>>       return input.toString();
>>>     }
>>>   };
>>>
>>> and toString() will create a copy of Text's data.
>>>
>>> However, here is its corresponding map implementation for
>>> Writables.bytes():
>>>
>>>   private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new
>>> MapFn<BytesWritable, ByteBuffer>() {
>>>     @Override
>>>     public ByteBuffer map(BytesWritable input) {
>>>       return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
>>>     }
>>>   };
>>>
>>> since ByteBuffer.wrap() will still reference BytesWritable()'s internal
>>> state, and the BytesWritable instance is reused across multiple records,
>>> this causes problems in Crunch if the BytesWritable came from a
>>> RecordReader.
>>>
>>> One work-around is to construct a new WritableType that uses a MapFn
>>> that creates a copy of the data, and only use it when reading from a Hadoop
>>> InputFormat that returns a BytesWritable.
>>>
>>> Is there a more general way to solve this?
>>>
>>
>>
>

Re: Hadoop InputFormat/RecordReducer and Writable reuse

Posted by Everett Anderson <ev...@nuna.com>.
On Tue, Jul 5, 2016 at 10:06 PM, Josh Wills <jo...@gmail.com> wrote:

> That _may_ cause problems in Crunch if the DoFns that are processing those
> ByteBuffers don't convert them to an immutable data type, or if they need
> to cache some of those values along the way during processing (if there
> isn't any caching in the flow, it's not normally an issue to process the
> records one-at-a-time w/no deep copy necessary.) The best practice in
> Crunch for those situations (i.e., non-immutable data + some sort of
> caching/maintenance of state) is to use the PType.getDetachedValue(obj)
> function to do a deep copy of the value in a way that is independent of the
> underlying data type (where you can be clever for PTypes of immutable
> objects and just return the value itself.)
>

If I understand correctly, for non-immutable types, the recommendation is
to be aware and add either a DoFn that copies the data right after read()
or use a PType that does this. Is that right?

For example (though if you were using Text, you should probably just use
Writables.strings() to avoid this), this won't work:

    TableSource<LongWritable, Text> source =
        From.formattedFile("/path/to/textfile",
                           TextInputFormat.class,
                           Writables.writables(LongWritable.class),
                           Writables.writables(Text.class));

    PTable<LongWritable, Text> data = getPipeline().read(source);

    for (Pair<LongWritable, Text> pair : data.materialize()) {
      System.out.println("offset: " + pair.first() +
                         ", text: " + pair.second().toString());
    }

but you could add something like this right after read() to force a copy
(which will work because Crunch would do this before a serialization
boundary) --

    final PType<Pair<LongWritable, Text>> pType = data.getPType();
    pType.initialize(getPipeline().getConfiguration());

    data = data.parallelDo(
        new DoFn<Pair<LongWritable, Text>, Pair<LongWritable, Text>>() {
          public void process(Pair<LongWritable, Text> input,
                              Emitter<Pair<LongWritable, Text>> emitter) {
            emitter.emit(pType.getDetachedValue(input));
          }
        }, data.getPTableType());

 or you could create your own PTypes to give to From.formattedFile that
make copies.


> On Tue, Jul 5, 2016 at 3:39 PM, Everett Anderson <ev...@nuna.com> wrote:
>
>> Hey,
>>
>> I recently implemented a Hadoop InputFormat that returns the raw bytes of
>> each record as a BytesWritable rather than as Text (as in TextInputFormat,
>> which assumes that the input is UTF-8).
>>
>> One thing I noticed is that Hadoop RecordReader
>> <https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html>
>> implementations generally
>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178>
>> re-use
>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118>
>> the
>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214>
>> Writable instance across multiple {getCurrentKey() + getCurrentValue()}
>> calls for efficiency, though this isn't documented.
>>
>> Crunch handles this for Text because Writables.strings() uses this
>> converter:
>>
>>   private static final MapFn<Text, String> TEXT_TO_STRING = new
>> MapFn<Text, String>() {
>>     @Override
>>     public String map(Text input) {
>>       return input.toString();
>>     }
>>   };
>>
>> and toString() will create a copy of Text's data.
>>
>> However, here is its corresponding map implementation for
>> Writables.bytes():
>>
>>   private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new
>> MapFn<BytesWritable, ByteBuffer>() {
>>     @Override
>>     public ByteBuffer map(BytesWritable input) {
>>       return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
>>     }
>>   };
>>
>> since ByteBuffer.wrap() will still reference BytesWritable()'s internal
>> state, and the BytesWritable instance is reused across multiple records,
>> this causes problems in Crunch if the BytesWritable came from a
>> RecordReader.
>>
>> One work-around is to construct a new WritableType that uses a MapFn that
>> creates a copy of the data, and only use it when reading from a Hadoop
>> InputFormat that returns a BytesWritable.
>>
>> Is there a more general way to solve this?
>>
>
>

Re: Hadoop InputFormat/RecordReducer and Writable reuse

Posted by Josh Wills <jo...@gmail.com>.
That _may_ cause problems in Crunch if the DoFns that are processing those
ByteBuffers don't convert them to an immutable data type, or if they need
to cache some of those values along the way during processing (if there
isn't any caching in the flow, it's not normally an issue to process the
records one-at-a-time w/no deep copy necessary.) The best practice in
Crunch for those situations (i.e., non-immutable data + some sort of
caching/maintenance of state) is to use the PType.getDetachedValue(obj)
function to do a deep copy of the value in a way that is independent of the
underlying data type (where you can be clever for PTypes of immutable
objects and just return the value itself.)

On Tue, Jul 5, 2016 at 3:39 PM, Everett Anderson <ev...@nuna.com> wrote:

> Hey,
>
> I recently implemented a Hadoop InputFormat that returns the raw bytes of
> each record as a BytesWritable rather than as Text (as in TextInputFormat,
> which assumes that the input is UTF-8).
>
> One thing I noticed is that Hadoop RecordReader
> <https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html>
> implementations generally
> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178>
> re-use
> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118>
> the
> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214>
> Writable instance across multiple {getCurrentKey() + getCurrentValue()}
> calls for efficiency, though this isn't documented.
>
> Crunch handles this for Text because Writables.strings() uses this
> converter:
>
>   private static final MapFn<Text, String> TEXT_TO_STRING = new
> MapFn<Text, String>() {
>     @Override
>     public String map(Text input) {
>       return input.toString();
>     }
>   };
>
> and toString() will create a copy of Text's data.
>
> However, here is its corresponding map implementation for
> Writables.bytes():
>
>   private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new
> MapFn<BytesWritable, ByteBuffer>() {
>     @Override
>     public ByteBuffer map(BytesWritable input) {
>       return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
>     }
>   };
>
> since ByteBuffer.wrap() will still reference BytesWritable()'s internal
> state, and the BytesWritable instance is reused across multiple records,
> this causes problems in Crunch if the BytesWritable came from a
> RecordReader.
>
> One work-around is to construct a new WritableType that uses a MapFn that
> creates a copy of the data, and only use it when reading from a Hadoop
> InputFormat that returns a BytesWritable.
>
> Is there a more general way to solve this?
>