You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Catlyn Kong <ca...@yelp.com> on 2019/09/09 18:15:55 UTC

How to handle avro BYTES type in flink

Hi fellow streamers,

I'm trying to support avro BYTES type in my flink application. Since
ByteBuffer isn't a supported type, I'm converting the field to an
Array[Byte]:

case Type.BYTES =>
  (avroObj: AnyRef) => {
     if (avroObj == null) {
       null
     } else {
       val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
       val bytes = new Array[Byte](byteBuffer.remaining())
       byteBuffer.get(bytes)
       bytes
       }
     }

And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
I'm getting ArrayIndexOutOfBoundsException:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)

Does anyone have experience with deserializing BYTES type from avro and
make it compatible with the table api? Wondering if it's cause I didn't use
the correct type or maybe I need to verify if there's enough data left in
the source?

Any input is appreciated.

Thanks!
Catlyn

Re: How to handle avro BYTES type in flink

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for reporting back Catlyn!

Am Do., 12. Sept. 2019 um 19:40 Uhr schrieb Catlyn Kong <ca...@yelp.com>:

> Turns out there was some other deserialization problem unrelated to this.
>
> On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong <ca...@yelp.com> wrote:
>
>> Hi fellow streamers,
>>
>> I'm trying to support avro BYTES type in my flink application. Since
>> ByteBuffer isn't a supported type, I'm converting the field to an
>> Array[Byte]:
>>
>> case Type.BYTES =>
>>   (avroObj: AnyRef) => {
>>      if (avroObj == null) {
>>        null
>>      } else {
>>        val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
>>        val bytes = new Array[Byte](byteBuffer.remaining())
>>        byteBuffer.get(bytes)
>>        bytes
>>        }
>>      }
>>
>> And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this
>> field.
>> I'm getting ArrayIndexOutOfBoundsException:
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
>> at
>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
>> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>> at
>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>> at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>>
>> Does anyone have experience with deserializing BYTES type from avro and
>> make it compatible with the table api? Wondering if it's cause I didn't use
>> the correct type or maybe I need to verify if there's enough data left in
>> the source?
>>
>> Any input is appreciated.
>>
>> Thanks!
>> Catlyn
>>
>>

Re: How to handle avro BYTES type in flink

Posted by Catlyn Kong <ca...@yelp.com>.
Turns out there was some other deserialization problem unrelated to this.

On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong <ca...@yelp.com> wrote:

> Hi fellow streamers,
>
> I'm trying to support avro BYTES type in my flink application. Since
> ByteBuffer isn't a supported type, I'm converting the field to an
> Array[Byte]:
>
> case Type.BYTES =>
>   (avroObj: AnyRef) => {
>      if (avroObj == null) {
>        null
>      } else {
>        val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
>        val bytes = new Array[Byte](byteBuffer.remaining())
>        byteBuffer.get(bytes)
>        bytes
>        }
>      }
>
> And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this field.
> I'm getting ArrayIndexOutOfBoundsException:
>
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
> at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>
> Does anyone have experience with deserializing BYTES type from avro and
> make it compatible with the table api? Wondering if it's cause I didn't use
> the correct type or maybe I need to verify if there's enough data left in
> the source?
>
> Any input is appreciated.
>
> Thanks!
> Catlyn
>
>