You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Ashish <pa...@gmail.com> on 2012/12/26 13:54:59 UTC

Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Folks,

I was trying to port Word co-occurrence example(using Pairs) to Crunch. Had
used famous TextPair class from Hadoop Definitive Guide.
While running getting this error

ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException:
java.io.NotSerializableException: org.apache.hadoop.io.Text

As an alternative, I created WordPair class that uses String instead of
Text and implemented Serializable, WritableComparable. This piece worked.

Is this behavior expected or I am missing something?


-- 
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal

Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Posted by Josh Wills <jw...@cloudera.com>.
Closures can be a little tricky to serialize, b/c it's not clear what of
their surrounding state they want to serialize as part of themselves...I
don't know enough Groovy to know how best to handle that.


On Tue, Feb 12, 2013 at 3:01 PM, Mike Barretta <mi...@gmail.com>wrote:

> Similar to the OP, getting this:
> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException:
> java.io.NotSerializableException: org.apache.crunch.impl.mr.MRPipeline
>
> ...strange
>
> I do have a non-serializable component to assemble my objects from Thrift
> objects, but ended up extending it in to an object that is
> - WrappedAssembler in this case.  function() is a passed in Groovy closure
> which does the actual emitting.
>
> Groovy code is:
>
>         def results = crunchPipeline.read(source).parallelDo(
> this.class.name + ":" + table, new DoFn<Pair<ColumnKey,
> ColumnDataArrayWritable>, String>() {
>
>             @Override
>             void process(Pair<ColumnKey, ColumnDataArrayWritable> input,
> Emitter<String> emitter) {
>                 input.second().toArray().each {
>                     def obj = new
> WrappedAssembler().assemble([PetalUtils.toThrift(input.first(), it)])
>                     function(obj, emitter)
>                 }
>             }
>         }, Writables.strings())
>
>
> On Thu, Dec 27, 2012 at 1:20 PM, Gabriel Reid <ga...@gmail.com>wrote:
>
>> Hi Ashish,
>>
>> The full implementation of this kind of thing (the translation from
>> pipeline code into MapReduce jobs) is contained in the packages under
>> org.apache.crunch.impl.mr.
>>
>> Crunch has a single Mapper class (o.a.c.impl.mr.run.CrunchMapper) and
>> a single Reducer class (o.a.c.impl.mr.run.CrunchReducer). All DoFns
>> are executed within this Mapper and Reducer (plus CrunchCombiner), so
>> they are effectively wrappers around the execution of the DoFns (and I
>> would say that your understanding is indeed correct).
>>
>> - Gabriel
>>
>> On Thu, Dec 27, 2012 at 3:36 AM, Ashish <pa...@gmail.com> wrote:
>> > Thanks Gabriel !
>> >
>> > Where can I look in Crunch code for this. Its like Crunch has some
>> wrapper
>> > MapReduce functions and post the complete pipeline graph, it decides
>> which
>> > functions to run when, passing the params from Mapper to DoFn instance.
>> Is
>> > this understanding correct?
>> >
>> >
>> > On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <ga...@gmail.com>
>> > wrote:
>> >>
>> >> Hi Ashish,
>> >>
>> >> Your solution looks good -- indeed, any non-serializable members are
>> >> typically initialized in the initialize method.
>> >>
>> >> The way crunch works is that DoFn instances are serialized at the
>> client,
>> >> and then deserialized, initialized, and run within map and reduce
>> tasks. A
>> >> single map or reduce task will make use of one or more DoFn instances
>> (ie
>> >> they can be chained together within a single task).
>> >>
>> >> - Gabriel
>> >>
>> >>
>> >> On 26 Dec 2012, at 15:26, Ashish <pa...@gmail.com> wrote:
>> >>
>> >> Hi Gabriel,
>> >>
>> >> Bull's eye :) My code was holding reference to a non-transient Text
>> >> instance.
>> >>
>> >> Here is the culprit code
>> >>
>> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> >> DoFn<String, Pair<TextPair, Long>>() {
>> >>             TextPair textPair = new TextPair();
>> >>             @Override
>> >>             public void process(String input, Emitter<Pair<TextPair,
>> >> Long>> emitter) {
>> >>
>> >>                 String[] words =  input.split("\\s+");
>> >>
>> >>                 for (int i = 0; i < words.length; i++) {
>> >>                     String word = words[i];
>> >>                     if(Strings.isNullOrEmpty(word)) {
>> >>                         continue;
>> >>                     }
>> >>
>> >>                     // lets look for neighbours now
>> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0
>> : i
>> >> - DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     for(int j = start; j < end; j++) {
>> >>                         if(i == j) continue;
>> >>                         textPair.set(new Text(words[i]), new
>> >> Text(words[j]));
>> >>                         emitter.emit(Pair.of(textPair, 1L));
>> >>                     }
>> >>                 }
>> >>             }
>> >>         },
>> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> >> Writables.longs()));
>> >>
>> >> And this is how I fixed it
>> >>
>> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> >> DoFn<String, Pair<TextPair, Long>>() {
>> >>             transient TextPair textPair;
>> >>
>> >>             @Override
>> >>             public void initialize() {
>> >>                 super.initialize();
>> >>                 textPair = new TextPair();
>> >>             }
>> >>
>> >>             @Override
>> >>             public void process(String input, Emitter<Pair<TextPair,
>> >> Long>> emitter) {
>> >>                 String[] words =  input.split("\\s+");
>> >>
>> >>                 for (int i = 0; i < words.length; i++) {
>> >>                     String word = words[i];
>> >>                     if(Strings.isNullOrEmpty(word)) {
>> >>                         continue;
>> >>                     }
>> >>
>> >>                     // lets look for neighbours now
>> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0
>> : i
>> >> - DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>> >>                     for(int j = start; j < end; j++) {
>> >>                         if(i == j) continue;
>> >>                         textPair.set(new Text(words[i]), new
>> >> Text(words[j]));
>> >>                         emitter.emit(Pair.of(textPair, 1L));
>> >>                     }
>> >>                 }
>> >>             }
>> >>         },
>> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> >> Writables.longs()));
>> >>
>> >> Would you please share how this part is converted to Hadoop Map
>> function?
>> >> Does crunch convert these function to normal MapReduce jobs or the
>> process
>> >> is more involved? I have to admit I coded this like I used to code
>> Mapper
>> >> functions.
>> >>
>> >> Appreciate your help.
>> >>
>> >>
>> >> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <ga...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi Ashish,
>> >>>
>> >>> Are you holding on to a non-transient Text instance in a DoFn perhaps?
>> >>> DoFns need to remain serializable.
>> >>>
>> >>> Otherwise, could you post your (non-working) code (I'm assuming its
>> >>> pretty short).
>> >>>
>> >>> - Gabriel
>> >>>
>> >>>
>> >>> On 26 Dec 2012, at 13:54, Ashish <pa...@gmail.com> wrote:
>> >>>
>> >>> Folks,
>> >>>
>> >>> I was trying to port Word co-occurrence example(using Pairs) to
>> Crunch.
>> >>> Had used famous TextPair class from Hadoop Definitive Guide.
>> >>> While running getting this error
>> >>>
>> >>> ERROR mr.MRPipeline:
>> >>> org.apache.crunch.impl.mr.run.CrunchRuntimeException:
>> >>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>> >>>
>> >>> As an alternative, I created WordPair class that uses String instead
>> of
>> >>> Text and implemented Serializable, WritableComparable. This piece
>> worked.
>> >>>
>> >>> Is this behavior expected or I am missing something?
>> >>>
>> >>>
>> >>> --
>> >>> thanks
>> >>> ashish
>> >>>
>> >>> Blog: http://www.ashishpaliwal.com/blog
>> >>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> thanks
>> >> ashish
>> >>
>> >> Blog: http://www.ashishpaliwal.com/blog
>> >> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>> >
>> >
>> >
>> >
>> > --
>> > thanks
>> > ashish
>> >
>> > Blog: http://www.ashishpaliwal.com/blog
>> > My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Posted by Mike Barretta <mi...@gmail.com>.
Similar to the OP, getting this:
ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException:
java.io.NotSerializableException: org.apache.crunch.impl.mr.MRPipeline

...strange

I do have a non-serializable component to assemble my objects from Thrift
objects, but ended up extending it in to an object that is
- WrappedAssembler in this case.  function() is a passed in Groovy closure
which does the actual emitting.

Groovy code is:

        def results =
crunchPipeline.read(source).parallelDo(this.class.name+ ":" + table,
new DoFn<Pair<ColumnKey, ColumnDataArrayWritable>, String>()
{

            @Override
            void process(Pair<ColumnKey, ColumnDataArrayWritable> input,
Emitter<String> emitter) {
                input.second().toArray().each {
                    def obj = new
WrappedAssembler().assemble([PetalUtils.toThrift(input.first(), it)])
                    function(obj, emitter)
                }
            }
        }, Writables.strings())


On Thu, Dec 27, 2012 at 1:20 PM, Gabriel Reid <ga...@gmail.com>wrote:

> Hi Ashish,
>
> The full implementation of this kind of thing (the translation from
> pipeline code into MapReduce jobs) is contained in the packages under
> org.apache.crunch.impl.mr.
>
> Crunch has a single Mapper class (o.a.c.impl.mr.run.CrunchMapper) and
> a single Reducer class (o.a.c.impl.mr.run.CrunchReducer). All DoFns
> are executed within this Mapper and Reducer (plus CrunchCombiner), so
> they are effectively wrappers around the execution of the DoFns (and I
> would say that your understanding is indeed correct).
>
> - Gabriel
>
> On Thu, Dec 27, 2012 at 3:36 AM, Ashish <pa...@gmail.com> wrote:
> > Thanks Gabriel !
> >
> > Where can I look in Crunch code for this. Its like Crunch has some
> wrapper
> > MapReduce functions and post the complete pipeline graph, it decides
> which
> > functions to run when, passing the params from Mapper to DoFn instance.
> Is
> > this understanding correct?
> >
> >
> > On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <ga...@gmail.com>
> > wrote:
> >>
> >> Hi Ashish,
> >>
> >> Your solution looks good -- indeed, any non-serializable members are
> >> typically initialized in the initialize method.
> >>
> >> The way crunch works is that DoFn instances are serialized at the
> client,
> >> and then deserialized, initialized, and run within map and reduce
> tasks. A
> >> single map or reduce task will make use of one or more DoFn instances
> (ie
> >> they can be chained together within a single task).
> >>
> >> - Gabriel
> >>
> >>
> >> On 26 Dec 2012, at 15:26, Ashish <pa...@gmail.com> wrote:
> >>
> >> Hi Gabriel,
> >>
> >> Bull's eye :) My code was holding reference to a non-transient Text
> >> instance.
> >>
> >> Here is the culprit code
> >>
> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
> >> DoFn<String, Pair<TextPair, Long>>() {
> >>             TextPair textPair = new TextPair();
> >>             @Override
> >>             public void process(String input, Emitter<Pair<TextPair,
> >> Long>> emitter) {
> >>
> >>                 String[] words =  input.split("\\s+");
> >>
> >>                 for (int i = 0; i < words.length; i++) {
> >>                     String word = words[i];
> >>                     if(Strings.isNullOrEmpty(word)) {
> >>                         continue;
> >>                     }
> >>
> >>                     // lets look for neighbours now
> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0
> : i
> >> - DEFAULT_NEIGHBOUR_WINDOW;
> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
> >>                     for(int j = start; j < end; j++) {
> >>                         if(i == j) continue;
> >>                         textPair.set(new Text(words[i]), new
> >> Text(words[j]));
> >>                         emitter.emit(Pair.of(textPair, 1L));
> >>                     }
> >>                 }
> >>             }
> >>         },
> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
> >> Writables.longs()));
> >>
> >> And this is how I fixed it
> >>
> >> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
> >> DoFn<String, Pair<TextPair, Long>>() {
> >>             transient TextPair textPair;
> >>
> >>             @Override
> >>             public void initialize() {
> >>                 super.initialize();
> >>                 textPair = new TextPair();
> >>             }
> >>
> >>             @Override
> >>             public void process(String input, Emitter<Pair<TextPair,
> >> Long>> emitter) {
> >>                 String[] words =  input.split("\\s+");
> >>
> >>                 for (int i = 0; i < words.length; i++) {
> >>                     String word = words[i];
> >>                     if(Strings.isNullOrEmpty(word)) {
> >>                         continue;
> >>                     }
> >>
> >>                     // lets look for neighbours now
> >>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0
> : i
> >> - DEFAULT_NEIGHBOUR_WINDOW;
> >>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
> >> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
> >>                     for(int j = start; j < end; j++) {
> >>                         if(i == j) continue;
> >>                         textPair.set(new Text(words[i]), new
> >> Text(words[j]));
> >>                         emitter.emit(Pair.of(textPair, 1L));
> >>                     }
> >>                 }
> >>             }
> >>         },
> >> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
> >> Writables.longs()));
> >>
> >> Would you please share how this part is converted to Hadoop Map
> function?
> >> Does crunch convert these function to normal MapReduce jobs or the
> process
> >> is more involved? I have to admit I coded this like I used to code
> Mapper
> >> functions.
> >>
> >> Appreciate your help.
> >>
> >>
> >> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <ga...@gmail.com>
> >> wrote:
> >>>
> >>> Hi Ashish,
> >>>
> >>> Are you holding on to a non-transient Text instance in a DoFn perhaps?
> >>> DoFns need to remain serializable.
> >>>
> >>> Otherwise, could you post your (non-working) code (I'm assuming its
> >>> pretty short).
> >>>
> >>> - Gabriel
> >>>
> >>>
> >>> On 26 Dec 2012, at 13:54, Ashish <pa...@gmail.com> wrote:
> >>>
> >>> Folks,
> >>>
> >>> I was trying to port Word co-occurrence example(using Pairs) to Crunch.
> >>> Had used famous TextPair class from Hadoop Definitive Guide.
> >>> While running getting this error
> >>>
> >>> ERROR mr.MRPipeline:
> >>> org.apache.crunch.impl.mr.run.CrunchRuntimeException:
> >>> java.io.NotSerializableException: org.apache.hadoop.io.Text
> >>>
> >>> As an alternative, I created WordPair class that uses String instead of
> >>> Text and implemented Serializable, WritableComparable. This piece
> worked.
> >>>
> >>> Is this behavior expected or I am missing something?
> >>>
> >>>
> >>> --
> >>> thanks
> >>> ashish
> >>>
> >>> Blog: http://www.ashishpaliwal.com/blog
> >>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
> >>
> >>
> >>
> >>
> >> --
> >> thanks
> >> ashish
> >>
> >> Blog: http://www.ashishpaliwal.com/blog
> >> My Photo Galleries: http://www.pbase.com/ashishpaliwal
> >
> >
> >
> >
> > --
> > thanks
> > ashish
> >
> > Blog: http://www.ashishpaliwal.com/blog
> > My Photo Galleries: http://www.pbase.com/ashishpaliwal
>

Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Ashish,

The full implementation of this kind of thing (the translation from
pipeline code into MapReduce jobs) is contained in the packages under
org.apache.crunch.impl.mr.

Crunch has a single Mapper class (o.a.c.impl.mr.run.CrunchMapper) and
a single Reducer class (o.a.c.impl.mr.run.CrunchReducer). All DoFns
are executed within this Mapper and Reducer (plus CrunchCombiner), so
they are effectively wrappers around the execution of the DoFns (and I
would say that your understanding is indeed correct).

- Gabriel

On Thu, Dec 27, 2012 at 3:36 AM, Ashish <pa...@gmail.com> wrote:
> Thanks Gabriel !
>
> Where can I look in Crunch code for this. Its like Crunch has some wrapper
> MapReduce functions and post the complete pipeline graph, it decides which
> functions to run when, passing the params from Mapper to DoFn instance. Is
> this understanding correct?
>
>
> On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <ga...@gmail.com>
> wrote:
>>
>> Hi Ashish,
>>
>> Your solution looks good -- indeed, any non-serializable members are
>> typically initialized in the initialize method.
>>
>> The way crunch works is that DoFn instances are serialized at the client,
>> and then deserialized, initialized, and run within map and reduce tasks. A
>> single map or reduce task will make use of one or more DoFn instances (ie
>> they can be chained together within a single task).
>>
>> - Gabriel
>>
>>
>> On 26 Dec 2012, at 15:26, Ashish <pa...@gmail.com> wrote:
>>
>> Hi Gabriel,
>>
>> Bull's eye :) My code was holding reference to a non-transient Text
>> instance.
>>
>> Here is the culprit code
>>
>> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> DoFn<String, Pair<TextPair, Long>>() {
>>             TextPair textPair = new TextPair();
>>             @Override
>>             public void process(String input, Emitter<Pair<TextPair,
>> Long>> emitter) {
>>
>>                 String[] words =  input.split("\\s+");
>>
>>                 for (int i = 0; i < words.length; i++) {
>>                     String word = words[i];
>>                     if(Strings.isNullOrEmpty(word)) {
>>                         continue;
>>                     }
>>
>>                     // lets look for neighbours now
>>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
>> - DEFAULT_NEIGHBOUR_WINDOW;
>>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>>                     for(int j = start; j < end; j++) {
>>                         if(i == j) continue;
>>                         textPair.set(new Text(words[i]), new
>> Text(words[j]));
>>                         emitter.emit(Pair.of(textPair, 1L));
>>                     }
>>                 }
>>             }
>>         },
>> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> Writables.longs()));
>>
>> And this is how I fixed it
>>
>> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
>> DoFn<String, Pair<TextPair, Long>>() {
>>             transient TextPair textPair;
>>
>>             @Override
>>             public void initialize() {
>>                 super.initialize();
>>                 textPair = new TextPair();
>>             }
>>
>>             @Override
>>             public void process(String input, Emitter<Pair<TextPair,
>> Long>> emitter) {
>>                 String[] words =  input.split("\\s+");
>>
>>                 for (int i = 0; i < words.length; i++) {
>>                     String word = words[i];
>>                     if(Strings.isNullOrEmpty(word)) {
>>                         continue;
>>                     }
>>
>>                     // lets look for neighbours now
>>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
>> - DEFAULT_NEIGHBOUR_WINDOW;
>>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
>> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>>                     for(int j = start; j < end; j++) {
>>                         if(i == j) continue;
>>                         textPair.set(new Text(words[i]), new
>> Text(words[j]));
>>                         emitter.emit(Pair.of(textPair, 1L));
>>                     }
>>                 }
>>             }
>>         },
>> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
>> Writables.longs()));
>>
>> Would you please share how this part is converted to Hadoop Map function?
>> Does crunch convert these function to normal MapReduce jobs or the process
>> is more involved? I have to admit I coded this like I used to code Mapper
>> functions.
>>
>> Appreciate your help.
>>
>>
>> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <ga...@gmail.com>
>> wrote:
>>>
>>> Hi Ashish,
>>>
>>> Are you holding on to a non-transient Text instance in a DoFn perhaps?
>>> DoFns need to remain serializable.
>>>
>>> Otherwise, could you post your (non-working) code (I'm assuming its
>>> pretty short).
>>>
>>> - Gabriel
>>>
>>>
>>> On 26 Dec 2012, at 13:54, Ashish <pa...@gmail.com> wrote:
>>>
>>> Folks,
>>>
>>> I was trying to port Word co-occurrence example(using Pairs) to Crunch.
>>> Had used famous TextPair class from Hadoop Definitive Guide.
>>> While running getting this error
>>>
>>> ERROR mr.MRPipeline:
>>> org.apache.crunch.impl.mr.run.CrunchRuntimeException:
>>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>>>
>>> As an alternative, I created WordPair class that uses String instead of
>>> Text and implemented Serializable, WritableComparable. This piece worked.
>>>
>>> Is this behavior expected or I am missing something?
>>>
>>>
>>> --
>>> thanks
>>> ashish
>>>
>>> Blog: http://www.ashishpaliwal.com/blog
>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>>
>>
>>
>> --
>> thanks
>> ashish
>>
>> Blog: http://www.ashishpaliwal.com/blog
>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>
>
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal

Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Posted by Ashish <pa...@gmail.com>.
Thanks Gabriel !

Where can I look in Crunch code for this. Its like Crunch has some wrapper
MapReduce functions and post the complete pipeline graph, it decides which
functions to run when, passing the params from Mapper to DoFn instance. Is
this understanding correct?


On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <ga...@gmail.com>wrote:

> Hi Ashish,
>
> Your solution looks good -- indeed, any non-serializable members are
> typically initialized in the initialize method.
>
> The way crunch works is that DoFn instances are serialized at the client,
> and then deserialized, initialized, and run within map and reduce tasks. A
> single map or reduce task will make use of one or more DoFn instances (ie
> they can be chained together within a single task).
>
> - Gabriel
>
>
> On 26 Dec 2012, at 15:26, Ashish <pa...@gmail.com> wrote:
>
> Hi Gabriel,
>
> Bull's eye :) My code was holding reference to a non-transient Text
> instance.
>
> Here is the culprit code
>
> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
> DoFn<String, Pair<TextPair, Long>>() {
>             TextPair textPair = new TextPair();
>             @Override
>             public void process(String input, Emitter<Pair<TextPair,
> Long>> emitter) {
>
>                 String[] words =  input.split("\\s+");
>
>                 for (int i = 0; i < words.length; i++) {
>                     String word = words[i];
>                     if(Strings.isNullOrEmpty(word)) {
>                         continue;
>                     }
>
>                     // lets look for neighbours now
>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
> - DEFAULT_NEIGHBOUR_WINDOW;
>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>                     for(int j = start; j < end; j++) {
>                         if(i == j) continue;
>                         textPair.set(new Text(words[i]), new
> Text(words[j]));
>                         emitter.emit(Pair.of(textPair, 1L));
>                     }
>                 }
>             }
>         },
> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
> Writables.longs()));
>
> And this is how I fixed it
>
> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
> DoFn<String, Pair<TextPair, Long>>() {
>             transient TextPair textPair;
>
>             @Override
>             public void initialize() {
>                 super.initialize();
>                 textPair = new TextPair();
>             }
>
>             @Override
>             public void process(String input, Emitter<Pair<TextPair,
> Long>> emitter) {
>                 String[] words =  input.split("\\s+");
>
>                 for (int i = 0; i < words.length; i++) {
>                     String word = words[i];
>                     if(Strings.isNullOrEmpty(word)) {
>                         continue;
>                     }
>
>                     // lets look for neighbours now
>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
> - DEFAULT_NEIGHBOUR_WINDOW;
>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
> words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>                     for(int j = start; j < end; j++) {
>                         if(i == j) continue;
>                         textPair.set(new Text(words[i]), new
> Text(words[j]));
>                         emitter.emit(Pair.of(textPair, 1L));
>                     }
>                 }
>             }
>         },
> textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
> Writables.longs()));
>
> Would you please share how this part is converted to Hadoop Map function?
> Does crunch convert these function to normal MapReduce jobs or the process
> is more involved? I have to admit I coded this like I used to code Mapper
> functions.
>
> Appreciate your help.
>
>
> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <ga...@gmail.com>wrote:
>
>> Hi Ashish,
>>
>> Are you holding on to a non-transient Text instance in a DoFn perhaps?
>> DoFns need to remain serializable.
>>
>> Otherwise, could you post your (non-working) code (I'm assuming its
>> pretty short).
>>
>> - Gabriel
>>
>>
>> On 26 Dec 2012, at 13:54, Ashish <pa...@gmail.com> wrote:
>>
>> Folks,
>>
>> I was trying to port Word co-occurrence example(using Pairs) to Crunch.
>> Had used famous TextPair class from Hadoop Definitive Guide.
>> While running getting this error
>>
>> ERROR mr.MRPipeline:
>> org.apache.crunch.impl.mr.run.CrunchRuntimeException:
>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>>
>> As an alternative, I created WordPair class that uses String instead of
>> Text and implemented Serializable, WritableComparable. This piece worked.
>>
>> Is this behavior expected or I am missing something?
>>
>>
>> --
>> thanks
>> ashish
>>
>> Blog: http://www.ashishpaliwal.com/blog
>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>>
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>
>


-- 
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal

Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Ashish,

Your solution looks good -- indeed, any non-serializable members are typically initialized in the initialize method. 

The way crunch works is that DoFn instances are serialized at the client, and then deserialized, initialized, and run within map and reduce tasks. A single map or reduce task will make use of one or more DoFn instances (ie they can be chained together within a single task). 

- Gabriel

On 26 Dec 2012, at 15:26, Ashish <pa...@gmail.com> wrote:

> Hi Gabriel,
> 
> Bull's eye :) My code was holding reference to a non-transient Text instance.
> 
> Here is the culprit code
> 
> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new DoFn<String, Pair<TextPair, Long>>() {
>             TextPair textPair = new TextPair();
>             @Override
>             public void process(String input, Emitter<Pair<TextPair, Long>> emitter) {
> 
>                 String[] words =  input.split("\\s+");
> 
>                 for (int i = 0; i < words.length; i++) {
>                     String word = words[i];
>                     if(Strings.isNullOrEmpty(word)) {
>                         continue;
>                     }
> 
>                     // lets look for neighbours now
>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i - DEFAULT_NEIGHBOUR_WINDOW;
>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >= words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>                     for(int j = start; j < end; j++) {
>                         if(i == j) continue;
>                         textPair.set(new Text(words[i]), new Text(words[j]));
>                         emitter.emit(Pair.of(textPair, 1L));
>                     }
>                 }
>             }
>         }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class), Writables.longs()));
> 
> And this is how I fixed it
> 
> PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new DoFn<String, Pair<TextPair, Long>>() {
>             transient TextPair textPair;
> 
>             @Override
>             public void initialize() {
>                 super.initialize();
>                 textPair = new TextPair();
>             }
> 
>             @Override
>             public void process(String input, Emitter<Pair<TextPair, Long>> emitter) {
>                 String[] words =  input.split("\\s+");
> 
>                 for (int i = 0; i < words.length; i++) {
>                     String word = words[i];
>                     if(Strings.isNullOrEmpty(word)) {
>                         continue;
>                     }
> 
>                     // lets look for neighbours now
>                     int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i - DEFAULT_NEIGHBOUR_WINDOW;
>                     int end = (i + DEFAULT_NEIGHBOUR_WINDOW >= words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
>                     for(int j = start; j < end; j++) {
>                         if(i == j) continue;
>                         textPair.set(new Text(words[i]), new Text(words[j]));
>                         emitter.emit(Pair.of(textPair, 1L));
>                     }
>                 }
>             }
>         }, textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class), Writables.longs()));
> 
> Would you please share how this part is converted to Hadoop Map function? Does crunch convert these function to normal MapReduce jobs or the process is more involved? I have to admit I coded this like I used to code Mapper functions.
> 
> Appreciate your help.
> 
> 
> On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <ga...@gmail.com> wrote:
>> Hi Ashish,
>> 
>> Are you holding on to a non-transient Text instance in a DoFn perhaps? DoFns need to remain serializable. 
>> 
>> Otherwise, could you post your (non-working) code (I'm assuming its pretty short). 
>> 
>> - Gabriel
>> 
>> 
>> On 26 Dec 2012, at 13:54, Ashish <pa...@gmail.com> wrote:
>> 
>>> Folks,
>>> 
>>> I was trying to port Word co-occurrence example(using Pairs) to Crunch. Had used famous TextPair class from Hadoop Definitive Guide.
>>> While running getting this error
>>> 
>>> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException: java.io.NotSerializableException: org.apache.hadoop.io.Text
>>> 
>>> As an alternative, I created WordPair class that uses String instead of Text and implemented Serializable, WritableComparable. This piece worked.
>>> 
>>> Is this behavior expected or I am missing something?
>>> 
>>> 
>>> -- 
>>> thanks
>>> ashish
>>> 
>>> Blog: http://www.ashishpaliwal.com/blog
>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
> 
> 
> 
> -- 
> thanks
> ashish
> 
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal

Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Posted by Ashish <pa...@gmail.com>.
Hi Gabriel,

Bull's eye :) My code was holding reference to a non-transient Text
instance.

Here is the culprit code

PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
DoFn<String, Pair<TextPair, Long>>() {
            TextPair textPair = new TextPair();
            @Override
            public void process(String input, Emitter<Pair<TextPair, Long>>
emitter) {

                String[] words =  input.split("\\s+");

                for (int i = 0; i < words.length; i++) {
                    String word = words[i];
                    if(Strings.isNullOrEmpty(word)) {
                        continue;
                    }

                    // lets look for neighbours now
                    int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
- DEFAULT_NEIGHBOUR_WINDOW;
                    int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
                    for(int j = start; j < end; j++) {
                        if(i == j) continue;
                        textPair.set(new Text(words[i]), new
Text(words[j]));
                        emitter.emit(Pair.of(textPair, 1L));
                    }
                }
            }
        },
textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
Writables.longs()));

And this is how I fixed it

PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
DoFn<String, Pair<TextPair, Long>>() {
            transient TextPair textPair;

            @Override
            public void initialize() {
                super.initialize();
                textPair = new TextPair();
            }

            @Override
            public void process(String input, Emitter<Pair<TextPair, Long>>
emitter) {
                String[] words =  input.split("\\s+");

                for (int i = 0; i < words.length; i++) {
                    String word = words[i];
                    if(Strings.isNullOrEmpty(word)) {
                        continue;
                    }

                    // lets look for neighbours now
                    int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
- DEFAULT_NEIGHBOUR_WINDOW;
                    int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
                    for(int j = start; j < end; j++) {
                        if(i == j) continue;
                        textPair.set(new Text(words[i]), new
Text(words[j]));
                        emitter.emit(Pair.of(textPair, 1L));
                    }
                }
            }
        },
textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
Writables.longs()));

Would you please share how this part is converted to Hadoop Map function?
Does crunch convert these function to normal MapReduce jobs or the process
is more involved? I have to admit I coded this like I used to code Mapper
functions.

Appreciate your help.


On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <ga...@gmail.com>wrote:

> Hi Ashish,
>
> Are you holding on to a non-transient Text instance in a DoFn perhaps?
> DoFns need to remain serializable.
>
> Otherwise, could you post your (non-working) code (I'm assuming its pretty
> short).
>
> - Gabriel
>
>
> On 26 Dec 2012, at 13:54, Ashish <pa...@gmail.com> wrote:
>
> Folks,
>
> I was trying to port Word co-occurrence example(using Pairs) to Crunch.
> Had used famous TextPair class from Hadoop Definitive Guide.
> While running getting this error
>
> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException:
> java.io.NotSerializableException: org.apache.hadoop.io.Text
>
> As an alternative, I created WordPair class that uses String instead of
> Text and implemented Serializable, WritableComparable. This piece worked.
>
> Is this behavior expected or I am missing something?
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>
>


-- 
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal

Re: Facing error java.io.NotSerializableException: org.apache.hadoop.io.Text

Posted by Gabriel Reid <ga...@gmail.com>.
Hi Ashish,

Are you holding on to a non-transient Text instance in a DoFn perhaps? DoFns need to remain serializable. 

Otherwise, could you post your (non-working) code (I'm assuming its pretty short). 

- Gabriel

On 26 Dec 2012, at 13:54, Ashish <pa...@gmail.com> wrote:

> Folks,
> 
> I was trying to port Word co-occurrence example(using Pairs) to Crunch. Had used famous TextPair class from Hadoop Definitive Guide.
> While running getting this error
> 
> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException: java.io.NotSerializableException: org.apache.hadoop.io.Text
> 
> As an alternative, I created WordPair class that uses String instead of Text and implemented Serializable, WritableComparable. This piece worked.
> 
> Is this behavior expected or I am missing something?
> 
> 
> -- 
> thanks
> ashish
> 
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal