You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Scott Carey <sc...@apache.org> on 2011/09/01 03:52:25 UTC

Re: avro BinaryDecoder bug ?

Looks like a bug to me.

Can you file a JIRA ticket?

Thanks!

On 8/29/11 1:24 PM, "Yang" <te...@gmail.com> wrote:

>if I read on a empty file with BinaryDecoder, I get EOF, good,
>
>but with the current code, if I read it again with the same decoder, I
>get a IndexOutofBoundException, not EOF.
>
>it seems that always giving EOF should be a more desirable behavior.
>
>you can see from this test code:
>
>import static org.junit.Assert.assertEquals;
>
>import java.io.IOException;
>
>import org.apache.avro.specific.SpecificRecord;
>import org.junit.Test;
>
>import myavro.Apple;
>
>import java.io.File;
>import java.io.FileInputStream;
>import java.io.FileNotFoundException;
>import java.io.FileOutputStream;
>import java.io.InputStream;
>import java.io.OutputStream;
>
>import org.apache.avro.io.Decoder;
>import org.apache.avro.io.DecoderFactory;
>import org.apache.avro.io.Encoder;
>import org.apache.avro.io.EncoderFactory;
>import org.apache.avro.specific.SpecificDatumReader;
>import org.apache.avro.specific.SpecificDatumWriter;
>
>class MyWriter {
>
>    SpecificDatumWriter<SpecificRecord> wr;
>    Encoder enc;
>    OutputStream ostream;
>
>    public MyWriter() throws FileNotFoundException {
>        wr = new SpecificDatumWriter<SpecificRecord>(new
>Apple().getSchema());
>        ostream = new FileOutputStream(new File("/tmp/testavro"));
>        enc = EncoderFactory.get().binaryEncoder(ostream, null);
>    }
>
>    public synchronized void dump(SpecificRecord event) throws
>IOException {
>        wr.write(event, enc);
>        enc.flush();
>    }
>
>}
>
>class MyReader {
>
>    SpecificDatumReader<SpecificRecord> rd;
>    Decoder dec;
>    InputStream istream;
>
>    public MyReader() throws FileNotFoundException {
>        rd = new SpecificDatumReader<SpecificRecord>(new
>Apple().getSchema());
>        istream = new FileInputStream(new File("/tmp/testavro"));
>        dec = DecoderFactory.get().binaryDecoder(istream, null);
>    }
>
>    public synchronized SpecificRecord read() throws IOException {
>        Object r = rd.read(null, dec);
>        return (SpecificRecord) r;
>    }
>
>}
>
>public class AvroWriteAndReadSameTime {
>    @Test
>    public void testWritingAndReadingAtSameTime() throws Exception {
>
>        MyWriter dumper = new MyWriter();
>        final Apple apple = new Apple();
>        apple.taste = "sweet";
>        dumper.dump(apple);
>
>        final MyReader rd = new MyReader();
>        rd.read();
>
>
>        try {
>        rd.read();
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>
>        // the second one somehow generates a NPE, we hope to get EOF...
>        try {
>        rd.read();
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>
>    }
>}
>
>
>
>
>
>the issue is in BinaryDecoder.readInt(), right now even when it hits
>EOF, it still advances the pos pointer.
>all the other APIs (readLong readFloat ...) do not do this. changing
>to the following  makes it work:
>
>
>  @Override
>  public int readInt() throws IOException {
>    ensureBounds(5); // won't throw index out of bounds
>    int len = 1;
>    int b = buf[pos] & 0xff;
>    int n = b & 0x7f;
>    if (b > 0x7f) {
>      b = buf[pos + len++] & 0xff;
>      n ^= (b & 0x7f) << 7;
>      if (b > 0x7f) {
>        b = buf[pos + len++] & 0xff;
>        n ^= (b & 0x7f) << 14;
>        if (b > 0x7f) {
>          b = buf[pos + len++] & 0xff;
>          n ^= (b & 0x7f) << 21;
>          if (b > 0x7f) {
>            b = buf[pos + len++] & 0xff;
>            n ^= (b & 0x7f) << 28;
>            if (b > 0x7f) {
>              throw new IOException("Invalid int encoding");
>            }
>          }
>        }
>      }
>    }
>    if (pos+len > limit) {
>      throw new EOFException();
>    }
>    pos += len;             //<================== CHANGE, used to be
>above the EOF throw
>
>    return (n >>> 1) ^ -(n & 1); // back to two's-complement
>  }



Re: Avro MR job problem with empty strings

Posted by Friso van Vollenhoven <fv...@xebia.com>.
Thanks for the ideas!

It turns out it was my bad after all. Silly mistake: I was feeding the comparator with the full map output schema, not just the key part of the pair schema. So it was expecting more than just the key when comparing. Also, it surfaces for more than just blanks, but not always. Which is why I was suspecting empty Utf8's were the problem.

Sorry for thinking it could be a bug outside of my own codeā€¦


Thanks,
Friso




On 3 sep. 2011, at 02:38, Scott Carey wrote:

> Some ideas:
> 
> A String is encoded as a Long length, followed by that number of bytes in
> Utf8.
> An empty string is therefore encoded as the number 0L -- which is one
> byte, 0x00.
> It appears that it is trying to skip a string or Long, but it is the end
> of the byte[].
> 
> So either it is expecting a Long or String to skip, and there is nothing
> there.  Perhaps the empty String was not encoded as an empty string, but
> skipped.  Perhaps a Long count or other number (What is the Schema being
> compared?)  
> 
> WordCount is often key = word, val = count, and so it would need to read
> the string word, and skip the long count.  If either of these is left out
> and not written, I would expect the sort of error below.
> 
> I hope that helps,
> 
> -Scott
> 
> On 9/1/11 5:42 AM, "Friso van Vollenhoven" <fv...@xebia.com>
> wrote:
> 
>> Hi All,
>> 
>> I am working on a modified version of the Avro MapReduce support to make
>> it play nice with the new Hadoop API (0.20.2). Most of the code if
>> borrowed from the Avro mapred package, but I decided not to fully
>> abstract away the Mapper and Reducer classes (like Avro does now using
>> HadoopMapper and HadoopReducer classes). All else is much the same as the
>> mapred implementation.
>> 
>> When testing, I ran into a issues when emitting empty strings (empty
>> Utf8) from the mapper as key. I get the following:
>> org.apache.avro.AvroRuntimeException: java.io.EOFException
>> 	at org.apache.avro.io.BinaryData.compare(BinaryData.java:74)
>> 	at org.apache.avro.io.BinaryData.compare(BinaryData.java:60)
>> 	at 
>> org.apache.avro.mapreduce.AvroKeyComparator.compare(AvroKeyComparator.java
>> :45)        <== this is my own code
>> 	at 
>> org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:
>> 120)
>> 	at 
>> org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
>> 	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
>> 	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
>> 	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
>> 	at 
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
>> Caused by: java.io.EOFException
>> 	at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:182)
>> 	at 
>> org.apache.avro.generic.GenericDatumReader.skip(GenericDatumReader.java:38
>> 9)
>> 	at org.apache.avro.io.BinaryData.compare(BinaryData.java:86)
>> 	at org.apache.avro.io.BinaryData.compare(BinaryData.java:72)
>> 	... 8 more
>> 
>> 
>> The root cause stack trace is as follows (taken from debugger, breakpoint
>> on the throw new EOFException(); line):
>> Thread [Thread-11] (Suspended (breakpoint at line 182 in BinaryDecoder))	
>> 	BinaryDecoder.readLong() line: 182	
>> 	GenericDatumReader<D>.skip(Schema, Decoder) line: 389	
>> 	BinaryData.compare(BinaryData$Decoders, Schema) line: 86	
>> 	BinaryData.compare(byte[], int, int, byte[], int, int, Schema) line: 72	
>> 	BinaryData.compare(byte[], int, byte[], int, Schema) line: 60	
>> 	AvroKeyComparator<T>.compare(byte[], int, int, byte[], int, int) line:
>> 45	
>> 	Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKeyValu
>> e() line: 120	
>> 	Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKey()
>> line: 92	
>> 	AvroMapReduceTest$WordCountingAvroReducer(Reducer<KEYIN,VALUEIN,KEYOUT,VA
>> LUEOUT>).run(Reducer<KEYIN,VALUEIN,KEYOUT,Contex>) line: 175	
>> 	ReduceTask.runNewReducer(JobConf, TaskUmbilicalProtocol, TaskReporter,
>> RawKeyValueIterator, RawComparator<INKEY>, Class<INKEY>, Class<INVALUE>)
>> line: 572	
>> 	ReduceTask.run(JobConf, TaskUmbilicalProtocol) line: 414	
>> 	LocalJobRunner$Job.run() line: 256	
>> 
>> I went through the decoding code to see where this comes from, but I
>> can't immediately spot where it goes wrong. I am guessing the actual
>> problem is earlier during execution where it possibly increases pos too
>> often.
>> 
>> Has anyone experienced this? I can live without emitting empty keys from
>> MR jobs, but I ran into this implementing a word count job on a text file
>> with empty lines (counting those could be a valid use case). I am using
>> Avro 1.5.2.
>> 
>> Thanks for any clues.
>> 
>> 
>> Cheers,
>> Friso
>> 
> 
> 


Re: Avro MR job problem with empty strings

Posted by Scott Carey <sc...@apache.org>.
Some ideas:

A String is encoded as a Long length, followed by that number of bytes in
Utf8.
An empty string is therefore encoded as the number 0L -- which is one
byte, 0x00.
It appears that it is trying to skip a string or Long, but it is the end
of the byte[].

So either it is expecting a Long or String to skip, and there is nothing
there.  Perhaps the empty String was not encoded as an empty string, but
skipped.  Perhaps a Long count or other number (What is the Schema being
compared?)  

WordCount is often key = word, val = count, and so it would need to read
the string word, and skip the long count.  If either of these is left out
and not written, I would expect the sort of error below.

I hope that helps,

-Scott

On 9/1/11 5:42 AM, "Friso van Vollenhoven" <fv...@xebia.com>
wrote:

>Hi All,
>
>I am working on a modified version of the Avro MapReduce support to make
>it play nice with the new Hadoop API (0.20.2). Most of the code if
>borrowed from the Avro mapred package, but I decided not to fully
>abstract away the Mapper and Reducer classes (like Avro does now using
>HadoopMapper and HadoopReducer classes). All else is much the same as the
>mapred implementation.
>
>When testing, I ran into a issues when emitting empty strings (empty
>Utf8) from the mapper as key. I get the following:
>org.apache.avro.AvroRuntimeException: java.io.EOFException
>	at org.apache.avro.io.BinaryData.compare(BinaryData.java:74)
>	at org.apache.avro.io.BinaryData.compare(BinaryData.java:60)
>	at 
>org.apache.avro.mapreduce.AvroKeyComparator.compare(AvroKeyComparator.java
>:45)        <== this is my own code
>	at 
>org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:
>120)
>	at 
>org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
>	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
>	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
>	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
>	at 
>org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
>Caused by: java.io.EOFException
>	at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:182)
>	at 
>org.apache.avro.generic.GenericDatumReader.skip(GenericDatumReader.java:38
>9)
>	at org.apache.avro.io.BinaryData.compare(BinaryData.java:86)
>	at org.apache.avro.io.BinaryData.compare(BinaryData.java:72)
>	... 8 more
>
>
>The root cause stack trace is as follows (taken from debugger, breakpoint
>on the throw new EOFException(); line):
>Thread [Thread-11] (Suspended (breakpoint at line 182 in BinaryDecoder))	
>	BinaryDecoder.readLong() line: 182	
>	GenericDatumReader<D>.skip(Schema, Decoder) line: 389	
>	BinaryData.compare(BinaryData$Decoders, Schema) line: 86	
>	BinaryData.compare(byte[], int, int, byte[], int, int, Schema) line: 72	
>	BinaryData.compare(byte[], int, byte[], int, Schema) line: 60	
>	AvroKeyComparator<T>.compare(byte[], int, int, byte[], int, int) line:
>45	
>	Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKeyValu
>e() line: 120	
>	Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKey()
>line: 92	
>	AvroMapReduceTest$WordCountingAvroReducer(Reducer<KEYIN,VALUEIN,KEYOUT,VA
>LUEOUT>).run(Reducer<KEYIN,VALUEIN,KEYOUT,Contex>) line: 175	
>	ReduceTask.runNewReducer(JobConf, TaskUmbilicalProtocol, TaskReporter,
>RawKeyValueIterator, RawComparator<INKEY>, Class<INKEY>, Class<INVALUE>)
>line: 572	
>	ReduceTask.run(JobConf, TaskUmbilicalProtocol) line: 414	
>	LocalJobRunner$Job.run() line: 256	
>
>I went through the decoding code to see where this comes from, but I
>can't immediately spot where it goes wrong. I am guessing the actual
>problem is earlier during execution where it possibly increases pos too
>often.
>
>Has anyone experienced this? I can live without emitting empty keys from
>MR jobs, but I ran into this implementing a word count job on a text file
>with empty lines (counting those could be a valid use case). I am using
>Avro 1.5.2.
>
>Thanks for any clues.
>
>
>Cheers,
>Friso
>



Avro MR job problem with empty strings

Posted by Friso van Vollenhoven <fv...@xebia.com>.
Hi All,

I am working on a modified version of the Avro MapReduce support to make it play nice with the new Hadoop API (0.20.2). Most of the code if borrowed from the Avro mapred package, but I decided not to fully abstract away the Mapper and Reducer classes (like Avro does now using HadoopMapper and HadoopReducer classes). All else is much the same as the mapred implementation.

When testing, I ran into a issues when emitting empty strings (empty Utf8) from the mapper as key. I get the following:
org.apache.avro.AvroRuntimeException: java.io.EOFException
	at org.apache.avro.io.BinaryData.compare(BinaryData.java:74)
	at org.apache.avro.io.BinaryData.compare(BinaryData.java:60)
	at org.apache.avro.mapreduce.AvroKeyComparator.compare(AvroKeyComparator.java:45)        <== this is my own code
	at org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:120)
	at org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
Caused by: java.io.EOFException
	at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:182)
	at org.apache.avro.generic.GenericDatumReader.skip(GenericDatumReader.java:389)
	at org.apache.avro.io.BinaryData.compare(BinaryData.java:86)
	at org.apache.avro.io.BinaryData.compare(BinaryData.java:72)
	... 8 more


The root cause stack trace is as follows (taken from debugger, breakpoint on the throw new EOFException(); line):
Thread [Thread-11] (Suspended (breakpoint at line 182 in BinaryDecoder))	
	BinaryDecoder.readLong() line: 182	
	GenericDatumReader<D>.skip(Schema, Decoder) line: 389	
	BinaryData.compare(BinaryData$Decoders, Schema) line: 86	
	BinaryData.compare(byte[], int, int, byte[], int, int, Schema) line: 72	
	BinaryData.compare(byte[], int, byte[], int, Schema) line: 60	
	AvroKeyComparator<T>.compare(byte[], int, int, byte[], int, int) line: 45	
	Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKeyValue() line: 120	
	Reducer$Context(ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).nextKey() line: 92	
	AvroMapReduceTest$WordCountingAvroReducer(Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>).run(Reducer<KEYIN,VALUEIN,KEYOUT,Contex>) line: 175	
	ReduceTask.runNewReducer(JobConf, TaskUmbilicalProtocol, TaskReporter, RawKeyValueIterator, RawComparator<INKEY>, Class<INKEY>, Class<INVALUE>) line: 572	
	ReduceTask.run(JobConf, TaskUmbilicalProtocol) line: 414	
	LocalJobRunner$Job.run() line: 256	

I went through the decoding code to see where this comes from, but I can't immediately spot where it goes wrong. I am guessing the actual problem is earlier during execution where it possibly increases pos too often.

Has anyone experienced this? I can live without emitting empty keys from MR jobs, but I ran into this implementing a word count job on a text file with empty lines (counting those could be a valid use case). I am using Avro 1.5.2.

Thanks for any clues.


Cheers,
Friso


Re: avro BinaryDecoder bug ?

Posted by Yang <te...@gmail.com>.
yes https://issues.apache.org/jira/browse/AVRO-882

On Wed, Aug 31, 2011 at 6:52 PM, Scott Carey <sc...@apache.org> wrote:

> Looks like a bug to me.
>
> Can you file a JIRA ticket?
>
> Thanks!
>
> On 8/29/11 1:24 PM, "Yang" <te...@gmail.com> wrote:
>
> >if I read on a empty file with BinaryDecoder, I get EOF, good,
> >
> >but with the current code, if I read it again with the same decoder, I
> >get a IndexOutofBoundException, not EOF.
> >
> >it seems that always giving EOF should be a more desirable behavior.
> >
> >you can see from this test code:
> >
> >import static org.junit.Assert.assertEquals;
> >
> >import java.io.IOException;
> >
> >import org.apache.avro.specific.SpecificRecord;
> >import org.junit.Test;
> >
> >import myavro.Apple;
> >
> >import java.io.File;
> >import java.io.FileInputStream;
> >import java.io.FileNotFoundException;
> >import java.io.FileOutputStream;
> >import java.io.InputStream;
> >import java.io.OutputStream;
> >
> >import org.apache.avro.io.Decoder;
> >import org.apache.avro.io.DecoderFactory;
> >import org.apache.avro.io.Encoder;
> >import org.apache.avro.io.EncoderFactory;
> >import org.apache.avro.specific.SpecificDatumReader;
> >import org.apache.avro.specific.SpecificDatumWriter;
> >
> >class MyWriter {
> >
> >    SpecificDatumWriter<SpecificRecord> wr;
> >    Encoder enc;
> >    OutputStream ostream;
> >
> >    public MyWriter() throws FileNotFoundException {
> >        wr = new SpecificDatumWriter<SpecificRecord>(new
> >Apple().getSchema());
> >        ostream = new FileOutputStream(new File("/tmp/testavro"));
> >        enc = EncoderFactory.get().binaryEncoder(ostream, null);
> >    }
> >
> >    public synchronized void dump(SpecificRecord event) throws
> >IOException {
> >        wr.write(event, enc);
> >        enc.flush();
> >    }
> >
> >}
> >
> >class MyReader {
> >
> >    SpecificDatumReader<SpecificRecord> rd;
> >    Decoder dec;
> >    InputStream istream;
> >
> >    public MyReader() throws FileNotFoundException {
> >        rd = new SpecificDatumReader<SpecificRecord>(new
> >Apple().getSchema());
> >        istream = new FileInputStream(new File("/tmp/testavro"));
> >        dec = DecoderFactory.get().binaryDecoder(istream, null);
> >    }
> >
> >    public synchronized SpecificRecord read() throws IOException {
> >        Object r = rd.read(null, dec);
> >        return (SpecificRecord) r;
> >    }
> >
> >}
> >
> >public class AvroWriteAndReadSameTime {
> >    @Test
> >    public void testWritingAndReadingAtSameTime() throws Exception {
> >
> >        MyWriter dumper = new MyWriter();
> >        final Apple apple = new Apple();
> >        apple.taste = "sweet";
> >        dumper.dump(apple);
> >
> >        final MyReader rd = new MyReader();
> >        rd.read();
> >
> >
> >        try {
> >        rd.read();
> >        } catch (Exception e) {
> >            e.printStackTrace();
> >        }
> >
> >        // the second one somehow generates a NPE, we hope to get EOF...
> >        try {
> >        rd.read();
> >        } catch (Exception e) {
> >            e.printStackTrace();
> >        }
> >
> >    }
> >}
> >
> >
> >
> >
> >
> >the issue is in BinaryDecoder.readInt(), right now even when it hits
> >EOF, it still advances the pos pointer.
> >all the other APIs (readLong readFloat ...) do not do this. changing
> >to the following  makes it work:
> >
> >
> >  @Override
> >  public int readInt() throws IOException {
> >    ensureBounds(5); // won't throw index out of bounds
> >    int len = 1;
> >    int b = buf[pos] & 0xff;
> >    int n = b & 0x7f;
> >    if (b > 0x7f) {
> >      b = buf[pos + len++] & 0xff;
> >      n ^= (b & 0x7f) << 7;
> >      if (b > 0x7f) {
> >        b = buf[pos + len++] & 0xff;
> >        n ^= (b & 0x7f) << 14;
> >        if (b > 0x7f) {
> >          b = buf[pos + len++] & 0xff;
> >          n ^= (b & 0x7f) << 21;
> >          if (b > 0x7f) {
> >            b = buf[pos + len++] & 0xff;
> >            n ^= (b & 0x7f) << 28;
> >            if (b > 0x7f) {
> >              throw new IOException("Invalid int encoding");
> >            }
> >          }
> >        }
> >      }
> >    }
> >    if (pos+len > limit) {
> >      throw new EOFException();
> >    }
> >    pos += len;             //<================== CHANGE, used to be
> >above the EOF throw
> >
> >    return (n >>> 1) ^ -(n & 1); // back to two's-complement
> >  }
>
>
>