You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mohammad Tariq <do...@gmail.com> on 2016/03/25 21:56:18 UTC

Bucket records based on time(kafka-hdfs-connector)

Hi kafka gurus,

This might sound a little off the track, but I don't know where else to go.
I tried the Confluent google group but it seems to be quite
unresponsive/inactive. Please bear with me. Many thanks in advance!

I am trying to copy data from Kafka into Hive tables using
kafka-hdfs-connector provided by Confluent. While I am able to do it
successfully I was wondering how to bucket the incoming data based on time
interval. For example, I would like to have a new partition created every 5
minutes.

I tried io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner with
partition.duration.ms but I think I am doing it the wrong way. I see only
one partition in the Hive table with all the data going into that
particular partition. Something like this :

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03

And all the avro objects are getting copied into this partition.

Instead, I would like to have something like this :

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03
year=2016/month=03/day=15/hour=19/minute=08
year=2016/month=03/day=15/hour=19/minute=13

Initially connector will create the path
year=2016/month=03/day=15/hour=19/minute=03 and will continue to copy all
the incoming data into this directory for next 5 minutes, and at the start
of 6th minute it should create a new path, i.e
year=2016/month=03/day=15/hour=19/minute=08 and copy the data for next 5
minutes into this directory, and so on.

This is how my config file looks like :

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test
hdfs.url=hdfs://localhost:9000
flush.size=3
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
partition.duration.ms=300000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/
locale=en
timezone=GMT
logs.dir=/kafka-connect/logs
topics.dir=/kafka-connect/topics
hive.integration=true
hive.metastore.uris=thrift://localhost:9083
schema.compatibility=BACKWARD

It would be really helpful if someone could point me in the right
direction. I would be glad to share more details in case it's required.
Don't want to make this email look like one that never ends.

Thank you so much for your valuable time!


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>