You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Chris Weaver (JIRA)" <ji...@apache.org> on 2015/08/10 20:53:45 UTC

[jira] [Updated] (FLUME-2760) Flafka configuration seems unclear and I get either serialized data for serialized schema in HDFS file.

     [ https://issues.apache.org/jira/browse/FLUME-2760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Weaver updated FLUME-2760:
--------------------------------
    Description: 
I am attempting to pull data from a Confluent(Kafka) stream where the messages are snappy compressed avro byte[] and push to an HDFS file containing the avro data with schema.
 
First attempt resulted in pulling the data and creating files with serialized avro data but no schema at the top of the file:
 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1

 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = zk:2181
 tier1.sources.source1.topic = mytopicX
 tier1.sources.source1.groupId = mygroupid
 tier1.sources.source1.batchSize = 100
 tier1.sources.source1.batchDurationMillis = 1000
 tier1.sources.source1.kafka.consumer.timeout.ms = 100
 tier1.sources.source1.auto.commit.enable=false
 tier1.sources.source1.channels = channel1
 
 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 10000
 
 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /some/where/useful
 tier1.sinks.sink1.hdfs.rollInterval = 300
 tier1.sinks.sink1.hdfs.rollSize = 134217728
 tier1.sinks.sink1.hdfs.rollCount = 0 
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.hdfs.writeFormat = Text
 tier1.sinks.sink1.channel = channel1

<try many different configs and finally create custom serializer>

Nth attempt I was able to create the file with the serialized schema at the top of the file but no data in the file. It had errors in the logs indicating the the schema I provided has not able to union with the data messages, the only difference was in the sink config -

 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /some/where/useful
 tier1.sinks.sink1.hdfs.rollInterval = 300
 tier1.sinks.sink1.hdfs.rollSize = 134217728
 tier1.sinks.sink1.hdfs.rollCount = 0 
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.hdfs.writeFormat = Text
 tier1.sinks.sink1.serializer = MySerializer$Builder # loaded to the flume machine
 tier1.sinks.sink1.channel = channel1

and the error given in the logs with this configuration:
Caused by: org.apache.avro.UnresolvedUnionException: Not in union [ my very large schema goes here ]: [Event headers = {topic=mytopicX, key=df84dcd6-a801-477c-a0d8-aa7e526b672d, timestamp=1438883746194}, body.length = 383 ]

Is there a config or method of getting your schema to be part of the output HDFS file that I am missing. With respect to the custom serializer I am simply extending the AbstractAvroEventSerializer<Event> and really just creating the schema for my data. 

Any help, docs or recommendations would much appreciated as I am stuck on this. 

  was:
I am attempting to pull data from a Confluent(Kafka) stream where the messages are snappy compressed avro byte[] and push to an HDFS file containing the avro data with schema.
 
First attempt resulted in pulling the data and creating files with serialized avro but no schema at the top of the file:
 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks = sink1

 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = zk:2181
 tier1.sources.source1.topic = mytopicX
 tier1.sources.source1.groupId = mygroupid
 tier1.sources.source1.batchSize = 100
 tier1.sources.source1.batchDurationMillis = 1000
 tier1.sources.source1.kafka.consumer.timeout.ms = 100
 tier1.sources.source1.auto.commit.enable=false
 tier1.sources.source1.channels = channel1
 
 tier1.channels.channel1.type = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 10000
 
 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /some/where/useful
 tier1.sinks.sink1.hdfs.rollInterval = 300
 tier1.sinks.sink1.hdfs.rollSize = 134217728
 tier1.sinks.sink1.hdfs.rollCount = 0 
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.hdfs.writeFormat = Text
 tier1.sinks.sink1.channel = channel1


Nth attempt I was able to create the file with the serialized schema at the top of the file but no data in the file. It had errors in the logs indicating the the schema I provided has not able to be unioned with the data messages:
The only difference was in the sink config -

 tier1.sinks.sink1.type = hdfs
 tier1.sinks.sink1.hdfs.path = /some/where/useful
 tier1.sinks.sink1.hdfs.rollInterval = 300
 tier1.sinks.sink1.hdfs.rollSize = 134217728
 tier1.sinks.sink1.hdfs.rollCount = 0 
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.hdfs.writeFormat = Text
 tier1.sinks.sink1.serializer = MySerializer$Builder
 tier1.sinks.sink1.channel = channel1

and the error given in the logs with this configuration:
Caused by: org.apache.avro.UnresolvedUnionException: Not in union [ my very large schema goes here ]: [Event headers = {topic=mytopicX, key=df84dcd6-a801-477c-a0d8-aa7e526b672d, timestamp=1438883746194}, body.length = 383 ]

Is there a config or method of getting your schema to be part of the output HDFS file that I am missing. With respect to the custom Serializer I am simply extending the AbstractAvroEventSerializer<Event> and really just creating the schema for my data. 

Any help, docs or recommendations would much appreciated as I am stuck on this. 


> Flafka configuration seems unclear and I get either serialized data for serialized schema in HDFS file.
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2760
>                 URL: https://issues.apache.org/jira/browse/FLUME-2760
>             Project: Flume
>          Issue Type: Question
>          Components: Sinks+Sources
>    Affects Versions: 1.6
>         Environment: Redhat
>            Reporter: Chris Weaver
>
> I am attempting to pull data from a Confluent(Kafka) stream where the messages are snappy compressed avro byte[] and push to an HDFS file containing the avro data with schema.
>  
> First attempt resulted in pulling the data and creating files with serialized avro data but no schema at the top of the file:
>  tier1.sources  = source1
>  tier1.channels = channel1
>  tier1.sinks = sink1
>  tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
>  tier1.sources.source1.zookeeperConnect = zk:2181
>  tier1.sources.source1.topic = mytopicX
>  tier1.sources.source1.groupId = mygroupid
>  tier1.sources.source1.batchSize = 100
>  tier1.sources.source1.batchDurationMillis = 1000
>  tier1.sources.source1.kafka.consumer.timeout.ms = 100
>  tier1.sources.source1.auto.commit.enable=false
>  tier1.sources.source1.channels = channel1
>  
>  tier1.channels.channel1.type = memory
>  tier1.channels.channel1.capacity = 10000
>  tier1.channels.channel1.transactionCapacity = 10000
>  
>  tier1.sinks.sink1.type = hdfs
>  tier1.sinks.sink1.hdfs.path = /some/where/useful
>  tier1.sinks.sink1.hdfs.rollInterval = 300
>  tier1.sinks.sink1.hdfs.rollSize = 134217728
>  tier1.sinks.sink1.hdfs.rollCount = 0 
>  tier1.sinks.sink1.hdfs.fileType = DataStream
>  tier1.sinks.sink1.hdfs.writeFormat = Text
>  tier1.sinks.sink1.channel = channel1
> <try many different configs and finally create custom serializer>
> Nth attempt I was able to create the file with the serialized schema at the top of the file but no data in the file. It had errors in the logs indicating the the schema I provided has not able to union with the data messages, the only difference was in the sink config -
>  tier1.sinks.sink1.type = hdfs
>  tier1.sinks.sink1.hdfs.path = /some/where/useful
>  tier1.sinks.sink1.hdfs.rollInterval = 300
>  tier1.sinks.sink1.hdfs.rollSize = 134217728
>  tier1.sinks.sink1.hdfs.rollCount = 0 
>  tier1.sinks.sink1.hdfs.fileType = DataStream
>  tier1.sinks.sink1.hdfs.writeFormat = Text
>  tier1.sinks.sink1.serializer = MySerializer$Builder # loaded to the flume machine
>  tier1.sinks.sink1.channel = channel1
> and the error given in the logs with this configuration:
> Caused by: org.apache.avro.UnresolvedUnionException: Not in union [ my very large schema goes here ]: [Event headers = {topic=mytopicX, key=df84dcd6-a801-477c-a0d8-aa7e526b672d, timestamp=1438883746194}, body.length = 383 ]
> Is there a config or method of getting your schema to be part of the output HDFS file that I am missing. With respect to the custom serializer I am simply extending the AbstractAvroEventSerializer<Event> and really just creating the schema for my data. 
> Any help, docs or recommendations would much appreciated as I am stuck on this. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)