You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Sunita Arvind <su...@gmail.com> on 2016/07/27 17:10:35 UTC

Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open

For benefit of anyone else hitting the same issue, here is what I found:

The serializer I was using was extending AbstractAvroEventSerializer. This
class has a lot of adoption, so its not likely to be an issue in the
abstract class. However, I got rid of this issue by overriding the
configure method in AbstractAvroEventSerializer in my custom serializer, as
below:


public void configure(Context context) {
    int syncIntervalBytes = context.getInteger("syncIntervalBytes",
Integer.valueOf(2048000)).intValue();
    String compressionCodec = context.getString("compressionCodec", "null");
    this.writer = new ReflectDatumWriter(this.getSchema());
    this.dataFileWriter = new DataFileWriter(this.writer);
    this.dataFileWriter.setSyncInterval(syncIntervalBytes);
    try {
        CodecFactory e = CodecFactory.fromString(compressionCodec);
        this.dataFileWriter.setCodec(e);
      *  this.dataFileWriter.create(schema,out); --> added the creation *
    } catch (AvroRuntimeException var5) {
        logger.warn("Unable to instantiate avro codec with name (" +
compressionCodec + "). Compression disabled. Exception follows.",
var5);
    } catch (IOException io){
        logger.warn("Could not open dataFileWriter Exception
follows.", io.getStackTrace());
    }

}

After this, the files are getting created in hdfs just right.
I was also able to view the files in spark using spark-avro package.
Hope this is the right way to do it and the solution helps someone.
Would love to hear if anyone in avro or flume community knows of a
better way to do it.

regards
Sunita


On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <su...@gmail.com>
wrote:

> Hello Experts,
>
> I am trying to convert a custom data source received in flume into avro
> and push to hdfs. What I am attempting to do is
> syslog -> flume -> flume interceptor to convert into
> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
> to Avro
>
> The flume configuration looks like:
>
> tier1.sources.syslogsource.interceptors.i2.type=timestamp
> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
> tier1.sources.syslogsource.interceptors.i1.type =
> com.flume.CustomToAvroConvertInterceptor$Builder
>
> #hdfs sink for archival and batch analysis
> tier1.sinks.hdfssink.type = hdfs
> tier1.sinks.hdfssink.hdfs.writeFormat = Text
> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>
> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>
> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
> # roll file if it's been 10 * 60 seconds = 600
> tier1.sinks.hdfssink.hdfs.rollInterval=600
> # roll file if we get 50,000 log lines (~25MB)
> tier1.sinks.hdfssink.hdfs.rollCount=0
> tier1.sinks.hdfssink.hdfs.batchSize = 100
> tier1.sinks.hdfssink.hdfs.rollSize=0
> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
> tier1.sinks.hdfssink.channel = hdfsmem
>
> When I use tier1.sinks.hdfssink.serializer=avro_event
> I get binary data stored into hdfs which is the
> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
> however this data cannot be parsed in hive. As a result, I see all nulls in
> the column values.
> Based on -
> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
> ?
> all I am doing in RawAvroHiveSerializer.convert is to decode using binary
> Decoder.
> The exception I get seems to be unrelated to the code itself, hence
> pasting the stack trace. Will share the code if it is required to identify
> the rootcause:
>
> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
> deliver event. Exception follows.
> org.apache.flume.EventDeliveryException:
> org.apache.avro.AvroRuntimeException: not open
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.avro.AvroRuntimeException: not open
>         at
> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>         at
> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>         at
> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>         at
> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>         at
> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>
> I can reproduce this local file system as well. In the testcase, I tried
> setting the file open to append=true and still encounter the same exception.
>
> Appreciate any guidance in this regard.
>
> regards
> Sunita
>

Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open

Posted by Sunita Arvind <su...@gmail.com>.
Nicolas,

For my usecase, I have the schema so I used the maven plugin to generate
avro classes for it. However, for your usecase where you do not have that
and just have a .avro, looks like this is an easy way:

java -jar ~/avro-tools-1.7.7.jar getschema twitter.avro > twitter.avsc

Found it here - https://github.com/miguno/avro-cli-examples

Another option as you are not using java is,
On mac :
1. ruby -e "$(curl -fsSL
https://raw.githubusercontent.com/Homebrew/install/master/install)" <
/dev/null 2> /dev/null
2. brew install avro-tools
3. avro-tools getschema file.avro

Typing avro-tools should give you the usage details and that might get you
further details. Hope this helps.

regards
Sunita

On Thu, Jul 28, 2016 at 6:29 AM, Nicolas Ranc <rn...@gmail.com> wrote:

> Dear Sunita,
>
> Thank you for your fast answer.
>
> It's not exactly what i'm expected.
> I am using apache avro C++ and i would like to deserialize an .avro file.
> Just with one .avro file and Avro C++ functions i'm trying to extract the
> schematic, the keys on the schema and the data at the end of the file.
> ("avroProgram2.avro").
>
> I saw different functions: for example *http://avro.apache.org/docs/1.7.6/api/cpp/html/
> <http://avro.apache.org/docs/1.7.6/api/cpp/html/>* in the file:*
> resolving.cc*, the program use load(...) to import schema and use it
> after in the avro::resolvingDecoder.
> In my case, i can't import this schema for deserialization: i just have
> the .avro file and i'm searching function to store information from this
> file avro file (extract the schema, the keys and values). I need these
> information because i have to create new object after in Matlab - using Mex
> functions (in C++).
>
> Thanks you for your time,
> Nicolas Ranc
>
>
>
>
>
>
>
>
>
>
>
>
>
> 2016-07-27 19:10 GMT+02:00 Sunita Arvind <su...@gmail.com>:
>
>> For benefit of anyone else hitting the same issue, here is what I found:
>>
>> The serializer I was using was extending AbstractAvroEventSerializer.
>> This class has a lot of adoption, so its not likely to be an issue in the
>> abstract class. However, I got rid of this issue by overriding the
>> configure method in AbstractAvroEventSerializer in my custom serializer, as
>> below:
>>
>>
>> public void configure(Context context) {
>>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>>     String compressionCodec = context.getString("compressionCodec", "null");
>>     this.writer = new ReflectDatumWriter(this.getSchema());
>>     this.dataFileWriter = new DataFileWriter(this.writer);
>>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>>     try {
>>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>>         this.dataFileWriter.setCodec(e);
>>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>>     } catch (AvroRuntimeException var5) {
>>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec + "). Compression disabled. Exception follows.", var5);
>>     } catch (IOException io){
>>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>>     }
>>
>> }
>>
>> After this, the files are getting created in hdfs just right.
>> I was also able to view the files in spark using spark-avro package.
>> Hope this is the right way to do it and the solution helps someone.
>> Would love to hear if anyone in avro or flume community knows of a better way to do it.
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <su...@gmail.com>
>> wrote:
>>
>>> Hello Experts,
>>>
>>> I am trying to convert a custom data source received in flume into avro
>>> and push to hdfs. What I am attempting to do is
>>> syslog -> flume -> flume interceptor to convert into
>>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>>> to Avro
>>>
>>> The flume configuration looks like:
>>>
>>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>>> tier1.sources.syslogsource.interceptors.i1.type =
>>> com.flume.CustomToAvroConvertInterceptor$Builder
>>>
>>> #hdfs sink for archival and batch analysis
>>> tier1.sinks.hdfssink.type = hdfs
>>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>>
>>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>>
>>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>>> # roll file if it's been 10 * 60 seconds = 600
>>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>>> # roll file if we get 50,000 log lines (~25MB)
>>> tier1.sinks.hdfssink.hdfs.rollCount=0
>>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>>> tier1.sinks.hdfssink.hdfs.rollSize=0
>>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>>> tier1.sinks.hdfssink.channel = hdfsmem
>>>
>>> When I use tier1.sinks.hdfssink.serializer=avro_event
>>> I get binary data stored into hdfs which is the
>>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>>> however this data cannot be parsed in hive. As a result, I see all nulls in
>>> the column values.
>>> Based on -
>>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>>> ?
>>> all I am doing in RawAvroHiveSerializer.convert is to decode using
>>> binary Decoder.
>>> The exception I get seems to be unrelated to the code itself, hence
>>> pasting the stack trace. Will share the code if it is required to identify
>>> the rootcause:
>>>
>>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>>> deliver event. Exception follows.
>>> org.apache.flume.EventDeliveryException:
>>> org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>>         at
>>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>>         at
>>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>>         at
>>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>>
>>> I can reproduce this local file system as well. In the testcase, I tried
>>> setting the file open to append=true and still encounter the same exception.
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita
>>>
>>
>>
>

Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open

Posted by Nicolas Ranc <rn...@gmail.com>.
Dear Sunita,

Thank you for your fast answer.

It's not exactly what i'm expected.
I am using apache avro C++ and i would like to deserialize an .avro file.
Just with one .avro file and Avro C++ functions i'm trying to extract the
schematic, the keys on the schema and the data at the end of the file.
("avroProgram2.avro").

I saw different functions: for example
*http://avro.apache.org/docs/1.7.6/api/cpp/html/
<http://avro.apache.org/docs/1.7.6/api/cpp/html/>* in the file:*
resolving.cc*, the program use load(...) to import schema and use it after
in the avro::resolvingDecoder.
In my case, i can't import this schema for deserialization: i just have the
.avro file and i'm searching function to store information from this file
avro file (extract the schema, the keys and values). I need these
information because i have to create new object after in Matlab - using Mex
functions (in C++).

Thanks you for your time,
Nicolas Ranc












2016-07-27 19:10 GMT+02:00 Sunita Arvind <su...@gmail.com>:

> For benefit of anyone else hitting the same issue, here is what I found:
>
> The serializer I was using was extending AbstractAvroEventSerializer. This
> class has a lot of adoption, so its not likely to be an issue in the
> abstract class. However, I got rid of this issue by overriding the
> configure method in AbstractAvroEventSerializer in my custom serializer, as
> below:
>
>
> public void configure(Context context) {
>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>     String compressionCodec = context.getString("compressionCodec", "null");
>     this.writer = new ReflectDatumWriter(this.getSchema());
>     this.dataFileWriter = new DataFileWriter(this.writer);
>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>     try {
>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>         this.dataFileWriter.setCodec(e);
>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>     } catch (AvroRuntimeException var5) {
>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec + "). Compression disabled. Exception follows.", var5);
>     } catch (IOException io){
>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>     }
>
> }
>
> After this, the files are getting created in hdfs just right.
> I was also able to view the files in spark using spark-avro package.
> Hope this is the right way to do it and the solution helps someone.
> Would love to hear if anyone in avro or flume community knows of a better way to do it.
>
> regards
> Sunita
>
>
> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <su...@gmail.com>
> wrote:
>
>> Hello Experts,
>>
>> I am trying to convert a custom data source received in flume into avro
>> and push to hdfs. What I am attempting to do is
>> syslog -> flume -> flume interceptor to convert into
>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>> to Avro
>>
>> The flume configuration looks like:
>>
>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>> tier1.sources.syslogsource.interceptors.i1.type =
>> com.flume.CustomToAvroConvertInterceptor$Builder
>>
>> #hdfs sink for archival and batch analysis
>> tier1.sinks.hdfssink.type = hdfs
>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>
>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>
>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>> # roll file if it's been 10 * 60 seconds = 600
>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>> # roll file if we get 50,000 log lines (~25MB)
>> tier1.sinks.hdfssink.hdfs.rollCount=0
>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>> tier1.sinks.hdfssink.hdfs.rollSize=0
>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>> tier1.sinks.hdfssink.channel = hdfsmem
>>
>> When I use tier1.sinks.hdfssink.serializer=avro_event
>> I get binary data stored into hdfs which is the
>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>> however this data cannot be parsed in hive. As a result, I see all nulls in
>> the column values.
>> Based on -
>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>> ?
>> all I am doing in RawAvroHiveSerializer.convert is to decode using binary
>> Decoder.
>> The exception I get seems to be unrelated to the code itself, hence
>> pasting the stack trace. Will share the code if it is required to identify
>> the rootcause:
>>
>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.
>> org.apache.flume.EventDeliveryException:
>> org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>         at
>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>         at
>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>         at
>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>         at
>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>
>> I can reproduce this local file system as well. In the testcase, I tried
>> setting the file open to append=true and still encounter the same exception.
>>
>> Appreciate any guidance in this regard.
>>
>> regards
>> Sunita
>>
>
>

Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open

Posted by Sunita Arvind <su...@gmail.com>.
Thanks Balazs,

I do see what you mentioned in the AbstractAvroEventSerializer, but when I
run with Visual VM debugger, I do not see control going to the aftercreate
method of AbstractAvroEventSerializer at all.From what it looked like to
me, there is an attempt to configure an output writer based on what is
passed as argument to configure(), convert it to whatever is required and
append (before creating). This is when I was not overriding configure(),
write() and afterCreate() methods and just overriding convert() and
providing the builder class.

Attached is the java serializer class which works fine now. Basically, to
give you some context on what I am deserializing, I have a flume
interceptor which converts protobuffer to an avro object. In the hdfs
serializer, I decode the bytearrays emitted by the flume interceptor into
avro object.

Please note, the below method in RawAvroHiveSerializer has no effect. I
added it while troubleshooting and did not originally override this method
when I was encountering the problem.

@Override
public void afterCreate() throws IOException {
    //noop
}

Probably I must've missed something else as no one else seems to have
encountered this problem.

Thanks for your feedback. Appreciate it. Do let me know if there is a
better way to handle the issue.

regards
Sunita


On Wed, Aug 3, 2016 at 1:46 PM, Balazs Donat Bessenyei <be...@cloudera.com>
wrote:

> Dear Sunita,
>
> Thank you for the useful advice!
>
> I've taken a look at the related source code.
> All usages of abstract class AbstractAvroEventSerializer seem to be
> calling public void afterCreate() which has your addition (
> *dataFileWriter.create*).
>
> Can you please provide a little more info about your custom serializer?
> (I'm trying to find out if somehow we could improve the code or maybe if
> this should go to the documentation.)
>
>
> Thank you
>
> Donat
>
> On Wed, Jul 27, 2016 at 10:10 AM, Sunita Arvind <su...@gmail.com>
> wrote:
>
>> For benefit of anyone else hitting the same issue, here is what I found:
>>
>> The serializer I was using was extending AbstractAvroEventSerializer.
>> This class has a lot of adoption, so its not likely to be an issue in the
>> abstract class. However, I got rid of this issue by overriding the
>> configure method in AbstractAvroEventSerializer in my custom serializer, as
>> below:
>>
>>
>> public void configure(Context context) {
>>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>>     String compressionCodec = context.getString("compressionCodec", "null");
>>     this.writer = new ReflectDatumWriter(this.getSchema());
>>     this.dataFileWriter = new DataFileWriter(this.writer);
>>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>>     try {
>>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>>         this.dataFileWriter.setCodec(e);
>>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>>     } catch (AvroRuntimeException var5) {
>>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec + "). Compression disabled. Exception follows.", var5);
>>     } catch (IOException io){
>>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>>     }
>>
>> }
>>
>> After this, the files are getting created in hdfs just right.
>> I was also able to view the files in spark using spark-avro package.
>> Hope this is the right way to do it and the solution helps someone.
>> Would love to hear if anyone in avro or flume community knows of a better way to do it.
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <su...@gmail.com>
>> wrote:
>>
>>> Hello Experts,
>>>
>>> I am trying to convert a custom data source received in flume into avro
>>> and push to hdfs. What I am attempting to do is
>>> syslog -> flume -> flume interceptor to convert into
>>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>>> to Avro
>>>
>>> The flume configuration looks like:
>>>
>>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>>> tier1.sources.syslogsource.interceptors.i1.type =
>>> com.flume.CustomToAvroConvertInterceptor$Builder
>>>
>>> #hdfs sink for archival and batch analysis
>>> tier1.sinks.hdfssink.type = hdfs
>>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>>
>>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>>
>>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>>> # roll file if it's been 10 * 60 seconds = 600
>>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>>> # roll file if we get 50,000 log lines (~25MB)
>>> tier1.sinks.hdfssink.hdfs.rollCount=0
>>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>>> tier1.sinks.hdfssink.hdfs.rollSize=0
>>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>>> tier1.sinks.hdfssink.channel = hdfsmem
>>>
>>> When I use tier1.sinks.hdfssink.serializer=avro_event
>>> I get binary data stored into hdfs which is the
>>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>>> however this data cannot be parsed in hive. As a result, I see all nulls in
>>> the column values.
>>> Based on -
>>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>>> ?
>>> all I am doing in RawAvroHiveSerializer.convert is to decode using
>>> binary Decoder.
>>> The exception I get seems to be unrelated to the code itself, hence
>>> pasting the stack trace. Will share the code if it is required to identify
>>> the rootcause:
>>>
>>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>>> deliver event. Exception follows.
>>> org.apache.flume.EventDeliveryException:
>>> org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>>         at
>>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>>         at
>>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>>         at
>>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>>
>>> I can reproduce this local file system as well. In the testcase, I tried
>>> setting the file open to append=true and still encounter the same exception.
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita
>>>
>>
>>
>

Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open

Posted by Balazs Donat Bessenyei <be...@cloudera.com>.
Dear Sunita,

Thank you for the useful advice!

I've taken a look at the related source code.
All usages of abstract class AbstractAvroEventSerializer seem to be
calling public
void afterCreate() which has your addition (*dataFileWriter.create*).

Can you please provide a little more info about your custom serializer?
(I'm trying to find out if somehow we could improve the code or maybe if
this should go to the documentation.)


Thank you

Donat

On Wed, Jul 27, 2016 at 10:10 AM, Sunita Arvind <su...@gmail.com>
wrote:

> For benefit of anyone else hitting the same issue, here is what I found:
>
> The serializer I was using was extending AbstractAvroEventSerializer. This
> class has a lot of adoption, so its not likely to be an issue in the
> abstract class. However, I got rid of this issue by overriding the
> configure method in AbstractAvroEventSerializer in my custom serializer, as
> below:
>
>
> public void configure(Context context) {
>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>     String compressionCodec = context.getString("compressionCodec", "null");
>     this.writer = new ReflectDatumWriter(this.getSchema());
>     this.dataFileWriter = new DataFileWriter(this.writer);
>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>     try {
>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>         this.dataFileWriter.setCodec(e);
>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>     } catch (AvroRuntimeException var5) {
>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec + "). Compression disabled. Exception follows.", var5);
>     } catch (IOException io){
>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>     }
>
> }
>
> After this, the files are getting created in hdfs just right.
> I was also able to view the files in spark using spark-avro package.
> Hope this is the right way to do it and the solution helps someone.
> Would love to hear if anyone in avro or flume community knows of a better way to do it.
>
> regards
> Sunita
>
>
> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <su...@gmail.com>
> wrote:
>
>> Hello Experts,
>>
>> I am trying to convert a custom data source received in flume into avro
>> and push to hdfs. What I am attempting to do is
>> syslog -> flume -> flume interceptor to convert into
>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>> to Avro
>>
>> The flume configuration looks like:
>>
>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>> tier1.sources.syslogsource.interceptors.i1.type =
>> com.flume.CustomToAvroConvertInterceptor$Builder
>>
>> #hdfs sink for archival and batch analysis
>> tier1.sinks.hdfssink.type = hdfs
>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>
>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>
>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>> # roll file if it's been 10 * 60 seconds = 600
>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>> # roll file if we get 50,000 log lines (~25MB)
>> tier1.sinks.hdfssink.hdfs.rollCount=0
>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>> tier1.sinks.hdfssink.hdfs.rollSize=0
>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>> tier1.sinks.hdfssink.channel = hdfsmem
>>
>> When I use tier1.sinks.hdfssink.serializer=avro_event
>> I get binary data stored into hdfs which is the
>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>> however this data cannot be parsed in hive. As a result, I see all nulls in
>> the column values.
>> Based on -
>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>> ?
>> all I am doing in RawAvroHiveSerializer.convert is to decode using binary
>> Decoder.
>> The exception I get seems to be unrelated to the code itself, hence
>> pasting the stack trace. Will share the code if it is required to identify
>> the rootcause:
>>
>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.
>> org.apache.flume.EventDeliveryException:
>> org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>         at
>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>         at
>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>         at
>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>         at
>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>
>> I can reproduce this local file system as well. In the testcase, I tried
>> setting the file open to append=true and still encounter the same exception.
>>
>> Appreciate any guidance in this regard.
>>
>> regards
>> Sunita
>>
>
>

Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open

Posted by Joe Lawson <jl...@opensourceconnections.com>.
Well done, thanks for the update!

On Wed, Jul 27, 2016 at 1:10 PM, Sunita Arvind <su...@gmail.com>
wrote:

> For benefit of anyone else hitting the same issue, here is what I found:
>
> The serializer I was using was extending AbstractAvroEventSerializer. This
> class has a lot of adoption, so its not likely to be an issue in the
> abstract class. However, I got rid of this issue by overriding the
> configure method in AbstractAvroEventSerializer in my custom serializer, as
> below:
>
>
> public void configure(Context context) {
>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>     String compressionCodec = context.getString("compressionCodec", "null");
>     this.writer = new ReflectDatumWriter(this.getSchema());
>     this.dataFileWriter = new DataFileWriter(this.writer);
>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>     try {
>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>         this.dataFileWriter.setCodec(e);
>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>     } catch (AvroRuntimeException var5) {
>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec + "). Compression disabled. Exception follows.", var5);
>     } catch (IOException io){
>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>     }
>
> }
>
> After this, the files are getting created in hdfs just right.
> I was also able to view the files in spark using spark-avro package.
> Hope this is the right way to do it and the solution helps someone.
> Would love to hear if anyone in avro or flume community knows of a better way to do it.
>
> regards
> Sunita
>
>
> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <su...@gmail.com>
> wrote:
>
>> Hello Experts,
>>
>> I am trying to convert a custom data source received in flume into avro
>> and push to hdfs. What I am attempting to do is
>> syslog -> flume -> flume interceptor to convert into
>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>> to Avro
>>
>> The flume configuration looks like:
>>
>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>> tier1.sources.syslogsource.interceptors.i1.type =
>> com.flume.CustomToAvroConvertInterceptor$Builder
>>
>> #hdfs sink for archival and batch analysis
>> tier1.sinks.hdfssink.type = hdfs
>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>
>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>
>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>> # roll file if it's been 10 * 60 seconds = 600
>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>> # roll file if we get 50,000 log lines (~25MB)
>> tier1.sinks.hdfssink.hdfs.rollCount=0
>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>> tier1.sinks.hdfssink.hdfs.rollSize=0
>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>> tier1.sinks.hdfssink.channel = hdfsmem
>>
>> When I use tier1.sinks.hdfssink.serializer=avro_event
>> I get binary data stored into hdfs which is the
>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>> however this data cannot be parsed in hive. As a result, I see all nulls in
>> the column values.
>> Based on -
>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>> ?
>> all I am doing in RawAvroHiveSerializer.convert is to decode using binary
>> Decoder.
>> The exception I get seems to be unrelated to the code itself, hence
>> pasting the stack trace. Will share the code if it is required to identify
>> the rootcause:
>>
>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.
>> org.apache.flume.EventDeliveryException:
>> org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>         at
>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>         at
>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>         at
>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>         at
>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>
>> I can reproduce this local file system as well. In the testcase, I tried
>> setting the file open to append=true and still encounter the same exception.
>>
>> Appreciate any guidance in this regard.
>>
>> regards
>> Sunita
>>
>
>


-- 
-Joe