You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Jeff Holoman (JIRA)" <ji...@apache.org> on 2015/10/11 17:52:05 UTC

[jira] [Commented] (FLUME-2760) Flafka configuration seems unclear and I get either serialized data for serialized schema in HDFS file.

    [ https://issues.apache.org/jira/browse/FLUME-2760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952319#comment-14952319 ] 

Jeff Holoman commented on FLUME-2760:
-------------------------------------

[~mqchrisw] Did you manage to get this sorted out? 

> Flafka configuration seems unclear and I get either serialized data for serialized schema in HDFS file.
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2760
>                 URL: https://issues.apache.org/jira/browse/FLUME-2760
>             Project: Flume
>          Issue Type: Question
>          Components: Sinks+Sources
>    Affects Versions: v1.6.0
>         Environment: Redhat
>            Reporter: Chris Weaver
>
> I am attempting to pull data from a Confluent(Kafka) stream where the messages are snappy compressed avro byte[] and push to an HDFS file containing the avro data with schema.
>  
> First attempt resulted in pulling the data and creating files with serialized avro data but no schema at the top of the file:
>  tier1.sources  = source1
>  tier1.channels = channel1
>  tier1.sinks = sink1
>  tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
>  tier1.sources.source1.zookeeperConnect = zk:2181
>  tier1.sources.source1.topic = mytopicX
>  tier1.sources.source1.groupId = mygroupid
>  tier1.sources.source1.batchSize = 100
>  tier1.sources.source1.batchDurationMillis = 1000
>  tier1.sources.source1.kafka.consumer.timeout.ms = 100
>  tier1.sources.source1.auto.commit.enable=false
>  tier1.sources.source1.channels = channel1
>  
>  tier1.channels.channel1.type = memory
>  tier1.channels.channel1.capacity = 10000
>  tier1.channels.channel1.transactionCapacity = 10000
>  
>  tier1.sinks.sink1.type = hdfs
>  tier1.sinks.sink1.hdfs.path = /some/where/useful
>  tier1.sinks.sink1.hdfs.rollInterval = 300
>  tier1.sinks.sink1.hdfs.rollSize = 134217728
>  tier1.sinks.sink1.hdfs.rollCount = 0 
>  tier1.sinks.sink1.hdfs.fileType = DataStream
>  tier1.sinks.sink1.hdfs.writeFormat = Text
>  tier1.sinks.sink1.channel = channel1
> <try many different configs and finally create custom serializer>
> Nth attempt I was able to create the file with the serialized schema at the top of the file but no data in the file. It had errors in the logs indicating the the schema I provided has not able to union with the data messages, the only difference was in the sink config -
>  tier1.sinks.sink1.type = hdfs
>  tier1.sinks.sink1.hdfs.path = /some/where/useful
>  tier1.sinks.sink1.hdfs.rollInterval = 300
>  tier1.sinks.sink1.hdfs.rollSize = 134217728
>  tier1.sinks.sink1.hdfs.rollCount = 0 
>  tier1.sinks.sink1.hdfs.fileType = DataStream
>  tier1.sinks.sink1.hdfs.writeFormat = Text
>  tier1.sinks.sink1.serializer = MySerializer$Builder # loaded to the flume machine
>  tier1.sinks.sink1.channel = channel1
> and the error given in the logs with this configuration:
> Caused by: org.apache.avro.UnresolvedUnionException: Not in union [ my very large schema goes here ]: [Event headers = {topic=mytopicX, key=df84dcd6-a801-477c-a0d8-aa7e526b672d, timestamp=1438883746194}, body.length = 383 ]
> Is there a config or method of getting your schema to be part of the output HDFS file that I am missing. With respect to the custom serializer I am simply extending the AbstractAvroEventSerializer<Event> and really just creating the schema for my data. 
> Any help, docs or recommendations would much appreciated as I am stuck on this. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)