You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by "Kumar, Ashok 6. (Nokia - IN/Bangalore)" <as...@nokia.com> on 2017/07/03 11:23:30 UTC

unable to save streaming data in hive as RCfile format

Hi ,

I am using kafka as source and hive as a sink , I am getting streaming data from kafka and trying to save this data into hive as RCfile format. But I am getting the following error :

2017-07-03 07:18:59,128 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:342)] k1: Creating Writer to Hive end point : {metaStoreUri='thrift://10.197.53.103:9083', database='default', table='test', partitionVals=[] }
2017-07-03 07:18:59,218 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.ClassCastException: org.apache.hadoop.hive.ql.io.RCFileOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.AcidOutputFormat
       at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:267)
       at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
       at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
       at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.ql.io.RCFileOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.AcidOutputFormat
       at org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:75)
       at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
       at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:66)
       at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
       at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
       at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:295)
       at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)


this is the command used for table creation :

create table test(name STRING,age BIGINT)clustered by (name) into 1 buckets STORED AS rcfile tblproperties("rcfile.compress"="NONE",'transactional'='true');



This is the conf file:


agent.sources = s1

agent.sinks = k1

agent.channels = c1



# properties of s1

#agent.sources.s1.type = syslogtcp

#agent.sources.s1.bind = 0.0.0.0

#agent.sources.s1.port = 41415



agent.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

agent.sources.s1.zookeeperConnect = x.x.x.x:31886

agent.sources.s1.kafka.bootstrap.servers = x.x.x.x:1025 x.x.x.x:1025 10. x.x.x.x:1025

agent.sources.s1.kafka.topics = kite1



# properties of c1

agent.channels.c1.type = memory

agent.channels.c1.capacity = 1000000

agent.channels.c1.transactionCapacity = 1000000



# properties of k1

agent.sources.s1.channels = c1

agent.sinks.k1.channel = c1



#agent.sinks.k1.type =logger

agent.sinks.k1.type = hive

agent.sinks.k1.hive.metastore = thrift:// x.x.x.x:9083

agent.sinks.k1.hive.database = default

agent.sinks.k1.hive.table = test

agent.sinks.k1.callTimeout=100000

agent.sinks.k1.useLocalTimeStamp = false

agent.sinks.k1.round = true

agent.sinks.k1.roundValue = 10

agent.sinks.k1.roundUnit = minute

agent.sinks.k1.serializer = DELIMITED

agent.sinks.k1.serializer.delimiter = ","

agent.sinks.k1.serializer.serdeSeparator = ','

agent.sinks.k1.serializer.fieldnames = name,age

please look into this issue.

Regards ,
Ashok