You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Ramgopal N (JIRA)" <ji...@apache.org> on 2017/08/01 04:32:00 UTC

[jira] [Updated] (FLUME-3138) SchemaURL from flume configuration is dropping the flume events expecting the schema url to be added in event header as against FLUME-2810

     [ https://issues.apache.org/jira/browse/FLUME-3138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ramgopal N updated FLUME-3138:
------------------------------
    Issue Type: Bug  (was: Question)

> SchemaURL from flume configuration is dropping the flume events expecting the schema url to be added in event header as against FLUME-2810
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3138
>                 URL: https://issues.apache.org/jira/browse/FLUME-3138
>             Project: Flume
>          Issue Type: Bug
>          Components: Configuration
>    Affects Versions: 1.7.0
>         Environment: Flume1.7
>            Reporter: Ramgopal N
>
> I have avro data coming to kafka topic. Flume reads the events from kafka and then using kite dataset with hdfs sink is put into HDFS as parquet data.
> Flume config is as below:
> agent.sinks.k1.channel = c1
> agent.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink
> agent.sinks.k1.kite.dataset.uri = dataset:hdfs://namenodeHA/kite/avro_to_parquet_item2
> agent.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
> agent.sinks.k1.hdfs.filePrefix=parquetdata
> agent.sinks.k1.hdfs.fileSuffix = .parquet
> agent.sinks.k1.hdfs.fileType=DataStream
> #agent.sinks.k1.hdfs.rollInterval=30
> #agent.sinks.k1.hdfs.rollCount=1
> #agent.sinks.k1.hdfs.batchSize=1
> agent.sinks.k1.kite.batchSize=2
> agent.sinks.k1.kite.rollInterval=30
> agent.sinks.k1.kite.flushable.commitOnBatch=true
> #agent.sinks.k1.hdfs.path = hdfs://namenodeHA/user/flumetest
> #agent.sinks.k1.serializer.compressionCodec = snappy
> agent.sinks.k1.serializer.schemaURL = hdfs://namenodeHA/kite/item.avsc
> I am getting the below exception in the flume logs:
> 2017-07-31 06:18:40,796 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:153)] Got brand-new compressor [.snappy]
> 2017-07-31 06:18:40,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize(FileSystemWriter.java:147)] Opened output appender ParquetAppender{path=hdfs://namenodeHA/kite/avro_to_parquet_item2/.6d1019b3-96c4-4334-b737-af260d17aac4.parquet.tmp, schema={"type":"record","name":"item","namespace":"item.avro","fields":[{"name":"i_item_sk","type":..................................{"name":"i_manager_id","type":["null","int"]},{"name":"i_product_name","type":["null","string"]}]}, fileSystem=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2077692400_17, ugi=root (auth:SIMPLE)]], avroParquetWriter=parquet.avro.AvroParquetWriter@31ffba30} for hdfs://namenodeHA/kite/avro_to_parquet_item2/6d1019b3-96c4-4334-b737-af260d17aac4.parquet
> 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:39)] Event delivery failed: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
> 2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
> 	at org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:42)
> 	at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:375)
> 	at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:301)
> 	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> 	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
> 	at org.apache.flume.sink.kite.parser.AvroParser.schema(AvroParser.java:185)
> 	at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:155)
> 	at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:56)
> 	at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:366)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)