You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Roshan Naik <ro...@hortonworks.com> on 2015/01/29 06:43:01 UTC

Puzzled with Avro serializer + HDFS Sink

If Flume is fed an avro file via the SpoolDir src and then the data is
drained into HDFS using the avro serializer, i am noticing that what ends
up in the hdfs is an avro file with the below schema:


{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "headers",
    "type" : {
      "type" : "map",
      "values" : "string"
    }
  }, {
    "name" : "body",
    "type" : "bytes"
  } ]
}


Basically each record is not the orignal avro record but instead it the
flume event object (i.e headers+body) and the body part of it has the
original  avro record embedded.

if the event header has a schema then the schema ends up in each record of
the output file.

AFAIKT, this is undesirable.  What is desired is to kind of recreate the
original file contents.

*My question* is how to configure flume so that the original records
instead of wrapping it in FlumeEvent objects in the destination file.


*details:*

Attached a debugger to the running agent and felt that the spooldir src is
deserializing the avro file correct into a flume event (header with schema,
body with avro datum)..

But on the hdfs sink side the avro serializer is writing out the *whole*
FlumeEvent object... instead of serializing just the event body.


Below is the config i used:

agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory

agent.sources = sd
*agent.sources.sd.type = spooldir*
agent.sources.sd.spoolDir = /tmp/avro
*agent.sources.sd.deserializer = AVRO*
*agent.sources.sd.deserializer.schemaType = LITERAL*
agent.sources.sd.channels = memoryChannel

agent.sinks = hdfsSink
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path = /tmp/flumetest/avro
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.serializer = avro_event
agent.sinks.hdfsSink.hdfs.callTimeout = 5000


-roshan

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Re: Puzzled with Avro serializer + HDFS Sink

Posted by Roshan Naik <ro...@hortonworks.com>.
That does it. Looks like its gone undocumented.
-roshan

________________________________________
From: Hari Shreedharan <hs...@cloudera.com>
Sent: Wednesday, January 28, 2015 10:29 PM
To: dev@flume.apache.org
Cc: dev@flume.apache.org
Subject: Re: Puzzled with Avro serializer + HDFS Sink

Take a look at the AvroEventSerializer: https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java


You’d need to specify the schema in a file and have that added to the event headers.




Thanks, Hari

On Wed, Jan 28, 2015 at 9:45 PM, Roshan Naik <ro...@hortonworks.com>
wrote:

> If Flume is fed an avro file via the SpoolDir src and then the data is
> drained into HDFS using the avro serializer, i am noticing that what ends
> up in the hdfs is an avro file with the below schema:
> {
>   "type" : "record",
>   "name" : "Event",
>   "fields" : [ {
>     "name" : "headers",
>     "type" : {
>       "type" : "map",
>       "values" : "string"
>     }
>   }, {
>     "name" : "body",
>     "type" : "bytes"
>   } ]
> }
> Basically each record is not the orignal avro record but instead it the
> flume event object (i.e headers+body) and the body part of it has the
> original  avro record embedded.
> if the event header has a schema then the schema ends up in each record of
> the output file.
> AFAIKT, this is undesirable.  What is desired is to kind of recreate the
> original file contents.
> *My question* is how to configure flume so that the original records
> instead of wrapping it in FlumeEvent objects in the destination file.
> *details:*
> Attached a debugger to the running agent and felt that the spooldir src is
> deserializing the avro file correct into a flume event (header with schema,
> body with avro datum)..
> But on the hdfs sink side the avro serializer is writing out the *whole*
> FlumeEvent object... instead of serializing just the event body.
> Below is the config i used:
> agent.channels = memoryChannel
> agent.channels.memoryChannel.type = memory
> agent.sources = sd
> *agent.sources.sd.type = spooldir*
> agent.sources.sd.spoolDir = /tmp/avro
> *agent.sources.sd.deserializer = AVRO*
> *agent.sources.sd.deserializer.schemaType = LITERAL*
> agent.sources.sd.channels = memoryChannel
> agent.sinks = hdfsSink
> agent.sinks.hdfsSink.type = hdfs
> agent.sinks.hdfsSink.channel = memoryChannel
> agent.sinks.hdfsSink.hdfs.path = /tmp/flumetest/avro
> agent.sinks.hdfsSink.hdfs.fileType = DataStream
> agent.sinks.hdfsSink.serializer = avro_event
> agent.sinks.hdfsSink.hdfs.callTimeout = 5000
> -roshan
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.

Re: Puzzled with Avro serializer + HDFS Sink

Posted by Hari Shreedharan <hs...@cloudera.com>.
Take a look at the AvroEventSerializer: https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AvroEventSerializer.java


You’d need to specify the schema in a file and have that added to the event headers.




Thanks, Hari

On Wed, Jan 28, 2015 at 9:45 PM, Roshan Naik <ro...@hortonworks.com>
wrote:

> If Flume is fed an avro file via the SpoolDir src and then the data is
> drained into HDFS using the avro serializer, i am noticing that what ends
> up in the hdfs is an avro file with the below schema:
> {
>   "type" : "record",
>   "name" : "Event",
>   "fields" : [ {
>     "name" : "headers",
>     "type" : {
>       "type" : "map",
>       "values" : "string"
>     }
>   }, {
>     "name" : "body",
>     "type" : "bytes"
>   } ]
> }
> Basically each record is not the orignal avro record but instead it the
> flume event object (i.e headers+body) and the body part of it has the
> original  avro record embedded.
> if the event header has a schema then the schema ends up in each record of
> the output file.
> AFAIKT, this is undesirable.  What is desired is to kind of recreate the
> original file contents.
> *My question* is how to configure flume so that the original records
> instead of wrapping it in FlumeEvent objects in the destination file.
> *details:*
> Attached a debugger to the running agent and felt that the spooldir src is
> deserializing the avro file correct into a flume event (header with schema,
> body with avro datum)..
> But on the hdfs sink side the avro serializer is writing out the *whole*
> FlumeEvent object... instead of serializing just the event body.
> Below is the config i used:
> agent.channels = memoryChannel
> agent.channels.memoryChannel.type = memory
> agent.sources = sd
> *agent.sources.sd.type = spooldir*
> agent.sources.sd.spoolDir = /tmp/avro
> *agent.sources.sd.deserializer = AVRO*
> *agent.sources.sd.deserializer.schemaType = LITERAL*
> agent.sources.sd.channels = memoryChannel
> agent.sinks = hdfsSink
> agent.sinks.hdfsSink.type = hdfs
> agent.sinks.hdfsSink.channel = memoryChannel
> agent.sinks.hdfsSink.hdfs.path = /tmp/flumetest/avro
> agent.sinks.hdfsSink.hdfs.fileType = DataStream
> agent.sinks.hdfsSink.serializer = avro_event
> agent.sinks.hdfsSink.hdfs.callTimeout = 5000
> -roshan
> -- 
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to 
> which it is addressed and may contain information that is confidential, 
> privileged and exempt from disclosure under applicable law. If the reader 
> of this message is not the intended recipient, you are hereby notified that 
> any printing, copying, dissemination, distribution, disclosure or 
> forwarding of this communication is strictly prohibited. If you have 
> received this communication in error, please contact the sender immediately 
> and delete it from your system. Thank You.