You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Daniel Eklund <do...@gmail.com> on 2011/06/09 13:53:03 UTC

Two questions: should I file a bug? and is this workaround performant?

Recently I uncovered a nasty situation in my data that caused an
IndexOutOfBoundsException.
I am including a sample pig script and data (at the bottom) that illuminate
the concern.

Succinctly:  records JOINed from one relation to another would throw an
IndexOutOfBoundsException if 1) the columns were derived from a PigStorage()
load of one large data:chararray followed by a STRSPLIT() of that data into
the proper amount of columns, an 2) there were bad records of insufficient
length (by STRSPLIT delimiter).

Why this is interesting is that if I were to use the PisgStorage() with the
delimiter directly, then the bad records would be silently dropped and the
JOIN would proceed WITHOUT throwing an exception (which is always good).

Once I discovered that the semantically equal (IMHO) notions of loading a
line as one big chararray and STRSPLITTING() on the delimiter is slightly
different from loading using the PigStorage() with the delimiter directly, I
realized I had to use a workaround as such:

     GOOD_RECORDS = FILTER RELATION_FROM_STRSPLIT by SIZE(*) == <my expected
column count>;

This was a silver lining of sorts as now I could something like

SPLIT RELATION_FROM_STRSPLIT into
     GOOD_RECORDS if SIZE(*) == <my expected column count>,
     BAD_RECORDS  if SIZE(*) != <my expected column count>;

and store the bad records for later analysis and remediation.

So, my questions:  Firstly, I feel I should file a bug for the exception
(they just never are a good thing to see).  Secondly, I am thinking of
applying the "load first, STRSPLIT second" pattern consistenly whenever I
load my data, as it allows me the ability to report out on bad data.

What does everyone feel about the performance of such a pattern?  I would
think that the difference  should be negligible.

thanks for any insight,
daniel


pig script
-----------

my_data = LOAD 'test.txt' using PigStorage(',')
      as       (age        :int,
             eye_color    :chararray,
             height          :int,
             name           :chararray);


my_data_raw =  LOAD 'test.txt' as (data:chararray);
my_data_from_split = FOREACH my_data_raw generate
           FLATTEN(STRSPLIT(data,','))
            as    (age        :int,
             eye_color    :chararray,
             height        :int,
             name        :chararray);


my_names = LOAD 'name.txt' using PigStorage(',')
      as         (name_key    :chararray,
             first        :chararray,
             last        :chararray);

-- this one has no exception
joined = JOIN my_data by name,
                 my_names by name_key;

-- this one throws an exception
bad_joined = JOIN my_data_from_split by name,
                 my_names by name_key;


-------- Sameple test.txt ----
24,brown,56,daniel
24,blue,57,janice
34,blue,23,arthi
43,blue,53,john
33,brown,23,apu
33,brown,64,ponce
34,green,23,jeaninine
25,brown,23,rachael
35,brown,43,Wolde
32,brown,33,gregory
35,brown,53,vlad
23,brown,64,emilda
33,blue,43,ravi
33,green,53,brendan
15,blue,43,ravichandra
15,brown,46,leonor
18,blue,23,caeser
23,JCVD             <-- here is the bad data
33,blue,46,anthony
23,blue,13,xavier
18,blue,33,patrick
33,brown,44,sang
18,brown,45,ari
24,green,46,vance
33,brown,23,qi
29,green,24,eloise
33,blue ,29,elaine



--- Exception thrown ---
java.lang.IndexOutOfBoundsException: Index: 14, Size: 14
    at java.util.ArrayList.RangeCheck(ArrayList.java:547)
    at java.util.ArrayList.get(ArrayList.java:322)
    at org.apache.pig.data.DefaultTuple.get(DefaultTuple.java:158)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getValueTuple(POFRJoin.java:403)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:261)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit.getNext(POLimit.java:85)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
    at
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
    at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:236)
    at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:231)
    at
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:646)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:322)
    at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)

Re: Two questions: should I file a bug? and is this workaround performant?

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Hm, I read your script a bit more carefully -- basically, ignoring
strsplit for a second, what you want is for this:
 FLATTEN(STRSPLIT(data,','))
           as    (age        :int,
            eye_color    :chararray,
            height        :int,
            name        :chararray);

to inject null fields and make the result match the schema, the same
way as load using PigStorage as (...) does, or to throw some sort of
runtime error along the lines of "actual data doesn't match schema."

I think that's reasonable.


D


On Thu, Jun 9, 2011 at 3:13 PM, Daniel Eklund <do...@gmail.com> wrote:
> i think i see what you're saying about STRSPLIT()... if i use it without the
> # of maxsplits, it returns as many as it can find...
> so with PigStorage i get
>    (23,JCVD,  , )
> but with the unspecified maxsplits I get
>    (23,JCVD)
> so, I could use maxsplits as equal to the number of columns I am expecting.
>
> understanding a bit better now
>
> On Thu, Jun 9, 2011 at 6:03 PM, Daniel Eklund <do...@gmail.com> wrote:
>
>> right, but I am not _explicitly_ accessing by index.. it's the byproduct of
>> the JOIN.
>>
>> I think the join operator could assert for the column to join on, if it
>> exists and silently fail if it doesn't (kinda like how PigStorage silently
>> fails to load those records that are inconsistent with the schema).
>>
>> thanks for the feedback on the performance
>>
>>
>> On Thu, Jun 9, 2011 at 5:57 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:
>>
>>> I think this is one of those "works as designed" cases.
>>>
>>> PigStorage splits by a character, and returns the fields that are
>>> generated as a result. If you give pigstorage a schema, it will create
>>> as many columns as you specify -- padding nulls as needed, and
>>> dropping extra columns as required, to match the schema you dictate.
>>> So, no surprises there.
>>>
>>> STRSPLIT returns a variable number of fields, depending on how many
>>> occur. No surprises there, either. One could write a STRSPLIT
>>> equivalent that is given a number of fields to return, and make it
>>> behave like PigStorage. That would probably be useful as an
>>> alternative.
>>>
>>> Accessing an index that doesn't exist causes an exception.. I'm not
>>> sure what you'd like us to do there; there isn't really a way for Pig
>>> to know what you meant when you split by comma and accessed the third
>>> element that turned out to not exist.
>>>
>>> Performance-wise, they should be roughly equivalent.
>>>
>>> The error handling you are getting by checking size of returned array
>>> can easily be replicated by simply checking for nulls after loading
>>> using PigStorage.
>>>
>>> D
>>>
>>> On Thu, Jun 9, 2011 at 4:53 AM, Daniel Eklund <do...@gmail.com> wrote:
>>> > Recently I uncovered a nasty situation in my data that caused an
>>> > IndexOutOfBoundsException.
>>> > I am including a sample pig script and data (at the bottom) that
>>> illuminate
>>> > the concern.
>>> >
>>> > Succinctly:  records JOINed from one relation to another would throw an
>>> > IndexOutOfBoundsException if 1) the columns were derived from a
>>> PigStorage()
>>> > load of one large data:chararray followed by a STRSPLIT() of that data
>>> into
>>> > the proper amount of columns, an 2) there were bad records of
>>> insufficient
>>> > length (by STRSPLIT delimiter).
>>> >
>>> > Why this is interesting is that if I were to use the PisgStorage() with
>>> the
>>> > delimiter directly, then the bad records would be silently dropped and
>>> the
>>> > JOIN would proceed WITHOUT throwing an exception (which is always good).
>>> >
>>> > Once I discovered that the semantically equal (IMHO) notions of loading
>>> a
>>> > line as one big chararray and STRSPLITTING() on the delimiter is
>>> slightly
>>> > different from loading using the PigStorage() with the delimiter
>>> directly, I
>>> > realized I had to use a workaround as such:
>>> >
>>> >     GOOD_RECORDS = FILTER RELATION_FROM_STRSPLIT by SIZE(*) == <my
>>> expected
>>> > column count>;
>>> >
>>> > This was a silver lining of sorts as now I could something like
>>> >
>>> > SPLIT RELATION_FROM_STRSPLIT into
>>> >     GOOD_RECORDS if SIZE(*) == <my expected column count>,
>>> >     BAD_RECORDS  if SIZE(*) != <my expected column count>;
>>> >
>>> > and store the bad records for later analysis and remediation.
>>> >
>>> > So, my questions:  Firstly, I feel I should file a bug for the exception
>>> > (they just never are a good thing to see).  Secondly, I am thinking of
>>> > applying the "load first, STRSPLIT second" pattern consistenly whenever
>>> I
>>> > load my data, as it allows me the ability to report out on bad data.
>>> >
>>> > What does everyone feel about the performance of such a pattern?  I
>>> would
>>> > think that the difference  should be negligible.
>>> >
>>> > thanks for any insight,
>>> > daniel
>>> >
>>> >
>>> > pig script
>>> > -----------
>>> >
>>> > my_data = LOAD 'test.txt' using PigStorage(',')
>>> >      as       (age        :int,
>>> >             eye_color    :chararray,
>>> >             height          :int,
>>> >             name           :chararray);
>>> >
>>> >
>>> > my_data_raw =  LOAD 'test.txt' as (data:chararray);
>>> > my_data_from_split = FOREACH my_data_raw generate
>>> >           FLATTEN(STRSPLIT(data,','))
>>> >            as    (age        :int,
>>> >             eye_color    :chararray,
>>> >             height        :int,
>>> >             name        :chararray);
>>> >
>>> >
>>> > my_names = LOAD 'name.txt' using PigStorage(',')
>>> >      as         (name_key    :chararray,
>>> >             first        :chararray,
>>> >             last        :chararray);
>>> >
>>> > -- this one has no exception
>>> > joined = JOIN my_data by name,
>>> >                 my_names by name_key;
>>> >
>>> > -- this one throws an exception
>>> > bad_joined = JOIN my_data_from_split by name,
>>> >                 my_names by name_key;
>>> >
>>> >
>>> > -------- Sameple test.txt ----
>>> > 24,brown,56,daniel
>>> > 24,blue,57,janice
>>> > 34,blue,23,arthi
>>> > 43,blue,53,john
>>> > 33,brown,23,apu
>>> > 33,brown,64,ponce
>>> > 34,green,23,jeaninine
>>> > 25,brown,23,rachael
>>> > 35,brown,43,Wolde
>>> > 32,brown,33,gregory
>>> > 35,brown,53,vlad
>>> > 23,brown,64,emilda
>>> > 33,blue,43,ravi
>>> > 33,green,53,brendan
>>> > 15,blue,43,ravichandra
>>> > 15,brown,46,leonor
>>> > 18,blue,23,caeser
>>> > 23,JCVD             <-- here is the bad data
>>> > 33,blue,46,anthony
>>> > 23,blue,13,xavier
>>> > 18,blue,33,patrick
>>> > 33,brown,44,sang
>>> > 18,brown,45,ari
>>> > 24,green,46,vance
>>> > 33,brown,23,qi
>>> > 29,green,24,eloise
>>> > 33,blue ,29,elaine
>>> >
>>> >
>>> >
>>> > --- Exception thrown ---
>>> > java.lang.IndexOutOfBoundsException: Index: 14, Size: 14
>>> >    at java.util.ArrayList.RangeCheck(ArrayList.java:547)
>>> >    at java.util.ArrayList.get(ArrayList.java:322)
>>> >    at org.apache.pig.data.DefaultTuple.get(DefaultTuple.java:158)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getValueTuple(POFRJoin.java:403)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:261)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit.getNext(POLimit.java:85)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:236)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:231)
>>> >    at
>>> >
>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
>>> >    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>>> >    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:646)
>>> >    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:322)
>>> >    at
>>> > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>>> >
>>>
>>
>>
>

Re: Two questions: should I file a bug? and is this workaround performant?

Posted by Daniel Eklund <do...@gmail.com>.
i think i see what you're saying about STRSPLIT()... if i use it without the
# of maxsplits, it returns as many as it can find...
so with PigStorage i get
    (23,JCVD,  , )
but with the unspecified maxsplits I get
    (23,JCVD)
so, I could use maxsplits as equal to the number of columns I am expecting.

understanding a bit better now

On Thu, Jun 9, 2011 at 6:03 PM, Daniel Eklund <do...@gmail.com> wrote:

> right, but I am not _explicitly_ accessing by index.. it's the byproduct of
> the JOIN.
>
> I think the join operator could assert for the column to join on, if it
> exists and silently fail if it doesn't (kinda like how PigStorage silently
> fails to load those records that are inconsistent with the schema).
>
> thanks for the feedback on the performance
>
>
> On Thu, Jun 9, 2011 at 5:57 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:
>
>> I think this is one of those "works as designed" cases.
>>
>> PigStorage splits by a character, and returns the fields that are
>> generated as a result. If you give pigstorage a schema, it will create
>> as many columns as you specify -- padding nulls as needed, and
>> dropping extra columns as required, to match the schema you dictate.
>> So, no surprises there.
>>
>> STRSPLIT returns a variable number of fields, depending on how many
>> occur. No surprises there, either. One could write a STRSPLIT
>> equivalent that is given a number of fields to return, and make it
>> behave like PigStorage. That would probably be useful as an
>> alternative.
>>
>> Accessing an index that doesn't exist causes an exception.. I'm not
>> sure what you'd like us to do there; there isn't really a way for Pig
>> to know what you meant when you split by comma and accessed the third
>> element that turned out to not exist.
>>
>> Performance-wise, they should be roughly equivalent.
>>
>> The error handling you are getting by checking size of returned array
>> can easily be replicated by simply checking for nulls after loading
>> using PigStorage.
>>
>> D
>>
>> On Thu, Jun 9, 2011 at 4:53 AM, Daniel Eklund <do...@gmail.com> wrote:
>> > Recently I uncovered a nasty situation in my data that caused an
>> > IndexOutOfBoundsException.
>> > I am including a sample pig script and data (at the bottom) that
>> illuminate
>> > the concern.
>> >
>> > Succinctly:  records JOINed from one relation to another would throw an
>> > IndexOutOfBoundsException if 1) the columns were derived from a
>> PigStorage()
>> > load of one large data:chararray followed by a STRSPLIT() of that data
>> into
>> > the proper amount of columns, an 2) there were bad records of
>> insufficient
>> > length (by STRSPLIT delimiter).
>> >
>> > Why this is interesting is that if I were to use the PisgStorage() with
>> the
>> > delimiter directly, then the bad records would be silently dropped and
>> the
>> > JOIN would proceed WITHOUT throwing an exception (which is always good).
>> >
>> > Once I discovered that the semantically equal (IMHO) notions of loading
>> a
>> > line as one big chararray and STRSPLITTING() on the delimiter is
>> slightly
>> > different from loading using the PigStorage() with the delimiter
>> directly, I
>> > realized I had to use a workaround as such:
>> >
>> >     GOOD_RECORDS = FILTER RELATION_FROM_STRSPLIT by SIZE(*) == <my
>> expected
>> > column count>;
>> >
>> > This was a silver lining of sorts as now I could something like
>> >
>> > SPLIT RELATION_FROM_STRSPLIT into
>> >     GOOD_RECORDS if SIZE(*) == <my expected column count>,
>> >     BAD_RECORDS  if SIZE(*) != <my expected column count>;
>> >
>> > and store the bad records for later analysis and remediation.
>> >
>> > So, my questions:  Firstly, I feel I should file a bug for the exception
>> > (they just never are a good thing to see).  Secondly, I am thinking of
>> > applying the "load first, STRSPLIT second" pattern consistenly whenever
>> I
>> > load my data, as it allows me the ability to report out on bad data.
>> >
>> > What does everyone feel about the performance of such a pattern?  I
>> would
>> > think that the difference  should be negligible.
>> >
>> > thanks for any insight,
>> > daniel
>> >
>> >
>> > pig script
>> > -----------
>> >
>> > my_data = LOAD 'test.txt' using PigStorage(',')
>> >      as       (age        :int,
>> >             eye_color    :chararray,
>> >             height          :int,
>> >             name           :chararray);
>> >
>> >
>> > my_data_raw =  LOAD 'test.txt' as (data:chararray);
>> > my_data_from_split = FOREACH my_data_raw generate
>> >           FLATTEN(STRSPLIT(data,','))
>> >            as    (age        :int,
>> >             eye_color    :chararray,
>> >             height        :int,
>> >             name        :chararray);
>> >
>> >
>> > my_names = LOAD 'name.txt' using PigStorage(',')
>> >      as         (name_key    :chararray,
>> >             first        :chararray,
>> >             last        :chararray);
>> >
>> > -- this one has no exception
>> > joined = JOIN my_data by name,
>> >                 my_names by name_key;
>> >
>> > -- this one throws an exception
>> > bad_joined = JOIN my_data_from_split by name,
>> >                 my_names by name_key;
>> >
>> >
>> > -------- Sameple test.txt ----
>> > 24,brown,56,daniel
>> > 24,blue,57,janice
>> > 34,blue,23,arthi
>> > 43,blue,53,john
>> > 33,brown,23,apu
>> > 33,brown,64,ponce
>> > 34,green,23,jeaninine
>> > 25,brown,23,rachael
>> > 35,brown,43,Wolde
>> > 32,brown,33,gregory
>> > 35,brown,53,vlad
>> > 23,brown,64,emilda
>> > 33,blue,43,ravi
>> > 33,green,53,brendan
>> > 15,blue,43,ravichandra
>> > 15,brown,46,leonor
>> > 18,blue,23,caeser
>> > 23,JCVD             <-- here is the bad data
>> > 33,blue,46,anthony
>> > 23,blue,13,xavier
>> > 18,blue,33,patrick
>> > 33,brown,44,sang
>> > 18,brown,45,ari
>> > 24,green,46,vance
>> > 33,brown,23,qi
>> > 29,green,24,eloise
>> > 33,blue ,29,elaine
>> >
>> >
>> >
>> > --- Exception thrown ---
>> > java.lang.IndexOutOfBoundsException: Index: 14, Size: 14
>> >    at java.util.ArrayList.RangeCheck(ArrayList.java:547)
>> >    at java.util.ArrayList.get(ArrayList.java:322)
>> >    at org.apache.pig.data.DefaultTuple.get(DefaultTuple.java:158)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getValueTuple(POFRJoin.java:403)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:261)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit.getNext(POLimit.java:85)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:236)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:231)
>> >    at
>> >
>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
>> >    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>> >    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:646)
>> >    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:322)
>> >    at
>> > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>> >
>>
>
>

Re: Two questions: should I file a bug? and is this workaround performant?

Posted by Daniel Eklund <do...@gmail.com>.
right, but I am not _explicitly_ accessing by index.. it's the byproduct of
the JOIN.

I think the join operator could assert for the column to join on, if it
exists and silently fail if it doesn't (kinda like how PigStorage silently
fails to load those records that are inconsistent with the schema).

thanks for the feedback on the performance

On Thu, Jun 9, 2011 at 5:57 PM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> I think this is one of those "works as designed" cases.
>
> PigStorage splits by a character, and returns the fields that are
> generated as a result. If you give pigstorage a schema, it will create
> as many columns as you specify -- padding nulls as needed, and
> dropping extra columns as required, to match the schema you dictate.
> So, no surprises there.
>
> STRSPLIT returns a variable number of fields, depending on how many
> occur. No surprises there, either. One could write a STRSPLIT
> equivalent that is given a number of fields to return, and make it
> behave like PigStorage. That would probably be useful as an
> alternative.
>
> Accessing an index that doesn't exist causes an exception.. I'm not
> sure what you'd like us to do there; there isn't really a way for Pig
> to know what you meant when you split by comma and accessed the third
> element that turned out to not exist.
>
> Performance-wise, they should be roughly equivalent.
>
> The error handling you are getting by checking size of returned array
> can easily be replicated by simply checking for nulls after loading
> using PigStorage.
>
> D
>
> On Thu, Jun 9, 2011 at 4:53 AM, Daniel Eklund <do...@gmail.com> wrote:
> > Recently I uncovered a nasty situation in my data that caused an
> > IndexOutOfBoundsException.
> > I am including a sample pig script and data (at the bottom) that
> illuminate
> > the concern.
> >
> > Succinctly:  records JOINed from one relation to another would throw an
> > IndexOutOfBoundsException if 1) the columns were derived from a
> PigStorage()
> > load of one large data:chararray followed by a STRSPLIT() of that data
> into
> > the proper amount of columns, an 2) there were bad records of
> insufficient
> > length (by STRSPLIT delimiter).
> >
> > Why this is interesting is that if I were to use the PisgStorage() with
> the
> > delimiter directly, then the bad records would be silently dropped and
> the
> > JOIN would proceed WITHOUT throwing an exception (which is always good).
> >
> > Once I discovered that the semantically equal (IMHO) notions of loading a
> > line as one big chararray and STRSPLITTING() on the delimiter is slightly
> > different from loading using the PigStorage() with the delimiter
> directly, I
> > realized I had to use a workaround as such:
> >
> >     GOOD_RECORDS = FILTER RELATION_FROM_STRSPLIT by SIZE(*) == <my
> expected
> > column count>;
> >
> > This was a silver lining of sorts as now I could something like
> >
> > SPLIT RELATION_FROM_STRSPLIT into
> >     GOOD_RECORDS if SIZE(*) == <my expected column count>,
> >     BAD_RECORDS  if SIZE(*) != <my expected column count>;
> >
> > and store the bad records for later analysis and remediation.
> >
> > So, my questions:  Firstly, I feel I should file a bug for the exception
> > (they just never are a good thing to see).  Secondly, I am thinking of
> > applying the "load first, STRSPLIT second" pattern consistenly whenever I
> > load my data, as it allows me the ability to report out on bad data.
> >
> > What does everyone feel about the performance of such a pattern?  I would
> > think that the difference  should be negligible.
> >
> > thanks for any insight,
> > daniel
> >
> >
> > pig script
> > -----------
> >
> > my_data = LOAD 'test.txt' using PigStorage(',')
> >      as       (age        :int,
> >             eye_color    :chararray,
> >             height          :int,
> >             name           :chararray);
> >
> >
> > my_data_raw =  LOAD 'test.txt' as (data:chararray);
> > my_data_from_split = FOREACH my_data_raw generate
> >           FLATTEN(STRSPLIT(data,','))
> >            as    (age        :int,
> >             eye_color    :chararray,
> >             height        :int,
> >             name        :chararray);
> >
> >
> > my_names = LOAD 'name.txt' using PigStorage(',')
> >      as         (name_key    :chararray,
> >             first        :chararray,
> >             last        :chararray);
> >
> > -- this one has no exception
> > joined = JOIN my_data by name,
> >                 my_names by name_key;
> >
> > -- this one throws an exception
> > bad_joined = JOIN my_data_from_split by name,
> >                 my_names by name_key;
> >
> >
> > -------- Sameple test.txt ----
> > 24,brown,56,daniel
> > 24,blue,57,janice
> > 34,blue,23,arthi
> > 43,blue,53,john
> > 33,brown,23,apu
> > 33,brown,64,ponce
> > 34,green,23,jeaninine
> > 25,brown,23,rachael
> > 35,brown,43,Wolde
> > 32,brown,33,gregory
> > 35,brown,53,vlad
> > 23,brown,64,emilda
> > 33,blue,43,ravi
> > 33,green,53,brendan
> > 15,blue,43,ravichandra
> > 15,brown,46,leonor
> > 18,blue,23,caeser
> > 23,JCVD             <-- here is the bad data
> > 33,blue,46,anthony
> > 23,blue,13,xavier
> > 18,blue,33,patrick
> > 33,brown,44,sang
> > 18,brown,45,ari
> > 24,green,46,vance
> > 33,brown,23,qi
> > 29,green,24,eloise
> > 33,blue ,29,elaine
> >
> >
> >
> > --- Exception thrown ---
> > java.lang.IndexOutOfBoundsException: Index: 14, Size: 14
> >    at java.util.ArrayList.RangeCheck(ArrayList.java:547)
> >    at java.util.ArrayList.get(ArrayList.java:322)
> >    at org.apache.pig.data.DefaultTuple.get(DefaultTuple.java:158)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getValueTuple(POFRJoin.java:403)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:261)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit.getNext(POLimit.java:85)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:236)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:231)
> >    at
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
> >    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> >    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:646)
> >    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:322)
> >    at
> > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
> >
>

Re: Two questions: should I file a bug? and is this workaround performant?

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
I think this is one of those "works as designed" cases.

PigStorage splits by a character, and returns the fields that are
generated as a result. If you give pigstorage a schema, it will create
as many columns as you specify -- padding nulls as needed, and
dropping extra columns as required, to match the schema you dictate.
So, no surprises there.

STRSPLIT returns a variable number of fields, depending on how many
occur. No surprises there, either. One could write a STRSPLIT
equivalent that is given a number of fields to return, and make it
behave like PigStorage. That would probably be useful as an
alternative.

Accessing an index that doesn't exist causes an exception.. I'm not
sure what you'd like us to do there; there isn't really a way for Pig
to know what you meant when you split by comma and accessed the third
element that turned out to not exist.

Performance-wise, they should be roughly equivalent.

The error handling you are getting by checking size of returned array
can easily be replicated by simply checking for nulls after loading
using PigStorage.

D

On Thu, Jun 9, 2011 at 4:53 AM, Daniel Eklund <do...@gmail.com> wrote:
> Recently I uncovered a nasty situation in my data that caused an
> IndexOutOfBoundsException.
> I am including a sample pig script and data (at the bottom) that illuminate
> the concern.
>
> Succinctly:  records JOINed from one relation to another would throw an
> IndexOutOfBoundsException if 1) the columns were derived from a PigStorage()
> load of one large data:chararray followed by a STRSPLIT() of that data into
> the proper amount of columns, an 2) there were bad records of insufficient
> length (by STRSPLIT delimiter).
>
> Why this is interesting is that if I were to use the PisgStorage() with the
> delimiter directly, then the bad records would be silently dropped and the
> JOIN would proceed WITHOUT throwing an exception (which is always good).
>
> Once I discovered that the semantically equal (IMHO) notions of loading a
> line as one big chararray and STRSPLITTING() on the delimiter is slightly
> different from loading using the PigStorage() with the delimiter directly, I
> realized I had to use a workaround as such:
>
>     GOOD_RECORDS = FILTER RELATION_FROM_STRSPLIT by SIZE(*) == <my expected
> column count>;
>
> This was a silver lining of sorts as now I could something like
>
> SPLIT RELATION_FROM_STRSPLIT into
>     GOOD_RECORDS if SIZE(*) == <my expected column count>,
>     BAD_RECORDS  if SIZE(*) != <my expected column count>;
>
> and store the bad records for later analysis and remediation.
>
> So, my questions:  Firstly, I feel I should file a bug for the exception
> (they just never are a good thing to see).  Secondly, I am thinking of
> applying the "load first, STRSPLIT second" pattern consistenly whenever I
> load my data, as it allows me the ability to report out on bad data.
>
> What does everyone feel about the performance of such a pattern?  I would
> think that the difference  should be negligible.
>
> thanks for any insight,
> daniel
>
>
> pig script
> -----------
>
> my_data = LOAD 'test.txt' using PigStorage(',')
>      as       (age        :int,
>             eye_color    :chararray,
>             height          :int,
>             name           :chararray);
>
>
> my_data_raw =  LOAD 'test.txt' as (data:chararray);
> my_data_from_split = FOREACH my_data_raw generate
>           FLATTEN(STRSPLIT(data,','))
>            as    (age        :int,
>             eye_color    :chararray,
>             height        :int,
>             name        :chararray);
>
>
> my_names = LOAD 'name.txt' using PigStorage(',')
>      as         (name_key    :chararray,
>             first        :chararray,
>             last        :chararray);
>
> -- this one has no exception
> joined = JOIN my_data by name,
>                 my_names by name_key;
>
> -- this one throws an exception
> bad_joined = JOIN my_data_from_split by name,
>                 my_names by name_key;
>
>
> -------- Sameple test.txt ----
> 24,brown,56,daniel
> 24,blue,57,janice
> 34,blue,23,arthi
> 43,blue,53,john
> 33,brown,23,apu
> 33,brown,64,ponce
> 34,green,23,jeaninine
> 25,brown,23,rachael
> 35,brown,43,Wolde
> 32,brown,33,gregory
> 35,brown,53,vlad
> 23,brown,64,emilda
> 33,blue,43,ravi
> 33,green,53,brendan
> 15,blue,43,ravichandra
> 15,brown,46,leonor
> 18,blue,23,caeser
> 23,JCVD             <-- here is the bad data
> 33,blue,46,anthony
> 23,blue,13,xavier
> 18,blue,33,patrick
> 33,brown,44,sang
> 18,brown,45,ari
> 24,green,46,vance
> 33,brown,23,qi
> 29,green,24,eloise
> 33,blue ,29,elaine
>
>
>
> --- Exception thrown ---
> java.lang.IndexOutOfBoundsException: Index: 14, Size: 14
>    at java.util.ArrayList.RangeCheck(ArrayList.java:547)
>    at java.util.ArrayList.get(ArrayList.java:322)
>    at org.apache.pig.data.DefaultTuple.get(DefaultTuple.java:158)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getValueTuple(POFRJoin.java:403)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:261)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin.getNext(POFRJoin.java:241)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit.getNext(POLimit.java:85)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
>    at
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange.getNext(POLocalRearrange.java:256)
>    at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.runPipeline(PigMapBase.java:236)
>    at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:231)
>    at
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.map(PigMapBase.java:53)
>    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:646)
>    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:322)
>    at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>